gateway/modules/services/serviceAi/mainServiceAi.py
2025-11-30 22:15:14 +01:00

1195 lines
55 KiB
Python

import json
import logging
import re
import time
from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelChat import PromptPlaceholder
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.shared.jsonUtils import (
extractJsonString,
repairBrokenJson,
extractSectionsFromDocument,
buildContinuationContext,
parseJsonWithModel
)
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
from modules.datamodels.datamodelAi import JsonAccumulationState
logger = logging.getLogger(__name__)
# Rebuild the model to resolve forward references
AiCallRequest.model_rebuild()
class AiService:
"""AI service with core operations integrated."""
def __init__(self, serviceCenter=None) -> None:
"""Initialize AI service with service center access.
Args:
serviceCenter: Service center instance for accessing other services
"""
self.services = serviceCenter
# Only depend on interfaces
self.aiObjects = None # Will be initialized in create() or ensureAiObjectsInitialized()
# Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready
self.extractionService = None
def _initializeSubmodules(self):
"""Initialize all submodules after aiObjects is ready."""
if self.aiObjects is None:
raise RuntimeError("aiObjects must be initialized before initializing submodules")
if self.extractionService is None:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
async def ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized and submodules are ready."""
if self.aiObjects is None:
logger.info("Lazy initializing AiObjects...")
self.aiObjects = await AiObjects.create()
logger.info("AiObjects initialization completed")
# Initialize submodules after aiObjects is ready
self._initializeSubmodules()
@classmethod
async def create(cls, serviceCenter=None) -> "AiService":
"""Create AiService instance with all connectors and submodules initialized."""
logger.info("AiService.create() called")
instance = cls(serviceCenter)
logger.info("AiService created, about to call AiObjects.create()...")
instance.aiObjects = await AiObjects.create()
logger.info("AiObjects.create() completed")
# Initialize all submodules after aiObjects is ready
instance._initializeSubmodules()
logger.info("AiService submodules initialized")
return instance
# Helper methods
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
Args:
prompt: The base prompt template
placeholders: Dictionary of placeholder key-value pairs
Returns:
Prompt with placeholders replaced
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Skip if content is None or empty
if content is None:
continue
# Replace {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", str(content))
return full_prompt
async def _analyzePromptAndCreateOptions(self, prompt: str) -> AiCallOptions:
"""Analyze prompt to determine appropriate AiCallOptions parameters."""
try:
# Get dynamic enum values from Pydantic models
operationTypes = [e.value for e in OperationTypeEnum]
priorities = [e.value for e in PriorityEnum]
processingModes = [e.value for e in ProcessingModeEnum]
# Create analysis prompt for AI to determine operation type and parameters
analysisPrompt = f"""
You are an AI operation analyzer. Analyze the following prompt and determine the most appropriate operation type and parameters.
PROMPT TO ANALYZE:
{self.services.utils.sanitizePromptContent(prompt, 'userinput')}
Based on the prompt content, determine:
1. operationType: Choose the most appropriate from: {', '.join(operationTypes)}
2. priority: Choose from: {', '.join(priorities)}
3. processingMode: Choose from: {', '.join(processingModes)}
4. compressPrompt: true/false (true for story-like prompts, false for structured prompts with JSON/schemas)
5. compressContext: true/false (true to summarize context, false to process fully)
Respond with ONLY a JSON object in this exact format:
{{
"operationType": "dataAnalyse",
"priority": "balanced",
"processingMode": "basic",
"compressPrompt": true,
"compressContext": true
}}
"""
# Use AI to analyze the prompt
request = AiCallRequest(
prompt=analysisPrompt,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
compressPrompt=True,
compressContext=False
)
)
response = await self.aiObjects.call(request)
# Parse AI response using structured parsing with AiCallOptions model
try:
# Use parseJsonWithModel to parse response into AiCallOptions (handles enum conversion automatically)
analysis = parseJsonWithModel(response.content, AiCallOptions)
return analysis
except Exception as e:
logger.warning(f"Failed to parse AI analysis response: {e}")
except Exception as e:
logger.warning(f"Prompt analysis failed: {e}")
# Fallback to default options
return AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC
)
async def _callAiWithLooping(
self,
prompt: str,
options: AiCallOptions,
debugPrefix: str = "ai_call",
promptBuilder: Optional[callable] = None,
promptArgs: Optional[Dict[str, Any]] = None,
operationId: Optional[str] = None,
userPrompt: Optional[str] = None
) -> str:
"""
Shared core function for AI calls with repair-based looping system.
Automatically repairs broken JSON and continues generation seamlessly.
Args:
prompt: The prompt to send to AI
options: AI call configuration options
debugPrefix: Prefix for debug file names
promptBuilder: Optional function to rebuild prompts for continuation
promptArgs: Optional arguments for prompt builder
operationId: Optional operation ID for progress tracking
Returns:
Complete AI response after all iterations
"""
maxIterations = 50 # Prevent infinite loops
iteration = 0
allSections = [] # Accumulate all sections across iterations
lastRawResponse = None # Store last raw JSON response for continuation
documentMetadata = None # Store document metadata (title, filename) from first iteration
accumulationState = None # Track accumulation state for string accumulation
# Get parent log ID for iteration operations
parentLogId = None
if operationId:
parentLogId = self.services.chat.getOperationLogId(operationId)
while iteration < maxIterations:
iteration += 1
# Create separate operation for each iteration with parent reference
iterationOperationId = None
if operationId:
iterationOperationId = f"{operationId}_iter_{iteration}"
self.services.chat.progressLogStart(
iterationOperationId,
"AI Call",
f"Iteration {iteration}",
"",
parentId=parentLogId
)
# Build iteration prompt
# CRITICAL: Build continuation prompt if we have sections OR if we have a previous response (even if broken)
# This ensures continuation prompts are built even when JSON is so broken that no sections can be extracted
if (len(allSections) > 0 or lastRawResponse) and promptBuilder and promptArgs:
# This is a continuation - build continuation context with raw JSON and rebuild prompt
continuationContext = buildContinuationContext(allSections, lastRawResponse)
if not lastRawResponse:
logger.warning(f"Iteration {iteration}: No previous response available for continuation!")
# Filter promptArgs to only include parameters that buildGenerationPrompt accepts
# buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext
filteredPromptArgs = {
k: v for k, v in promptArgs.items()
if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content']
}
# Rebuild prompt with continuation context using the provided prompt builder
iterationPrompt = await promptBuilder(**filteredPromptArgs, continuationContext=continuationContext)
else:
# First iteration - use original prompt
iterationPrompt = prompt
# Make AI call
try:
if iterationOperationId:
self.services.chat.progressLogUpdate(iterationOperationId, 0.3, "Calling AI model")
request = AiCallRequest(
prompt=iterationPrompt,
context="",
options=options
)
# Write the ACTUAL prompt sent to AI
if iteration == 1:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
else:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiObjects.call(request)
result = response.content
# Update progress after AI call
if iterationOperationId:
self.services.chat.progressLogUpdate(iterationOperationId, 0.6, "AI response received")
# Write raw AI response to debug file
if iteration == 1:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
else:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration (only if workflow exists and has id)
if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id:
try:
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
f"ai.call.{debugPrefix}.iteration_{iteration}"
)
except Exception as statError:
# Don't break the main loop if stat storage fails
logger.warning(f"Failed to store workflow stat: {str(statError)}")
# Check for error response using generic error detection (errorCount > 0 or modelName == "error")
if hasattr(response, 'errorCount') and response.errorCount > 0:
errorMsg = f"Iteration {iteration}: Error response detected (errorCount={response.errorCount}), stopping loop: {result[:200] if result else 'empty'}"
logger.error(errorMsg)
break
if hasattr(response, 'modelName') and response.modelName == "error":
errorMsg = f"Iteration {iteration}: Error response detected (modelName=error), stopping loop: {result[:200] if result else 'empty'}"
logger.error(errorMsg)
break
if not result or not result.strip():
logger.warning(f"Iteration {iteration}: Empty response, stopping")
break
# Check if this is a text response (not document generation)
# Text responses don't need JSON parsing - return immediately after first successful response
isTextResponse = (promptBuilder is None and promptArgs is None) or debugPrefix == "text"
if isTextResponse:
# For text responses, return the text immediately - no JSON parsing needed
logger.info(f"Iteration {iteration}: Text response received, returning immediately")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
return result
# Store raw response for continuation (even if broken)
lastRawResponse = result
# Extract sections from response (handles both valid and broken JSON)
# Only for document generation (JSON responses)
# CRITICAL: Pass allSections and accumulationState to enable string accumulation
extractedSections, wasJsonComplete, parsedResult, accumulationState = self._extractSectionsFromResponse(
result, iteration, debugPrefix, allSections, accumulationState
)
# Define KPIs if we just entered accumulation mode (iteration 1, incomplete JSON)
if accumulationState and accumulationState.isAccumulationMode and iteration == 1 and not accumulationState.kpis:
logger.info(f"Iteration {iteration}: Defining KPIs for accumulation tracking")
continuationContext = buildContinuationContext(allSections, result)
kpiDefinitions = await self._defineKpisFromPrompt(
userPrompt or prompt,
parsedResult,
continuationContext,
debugPrefix
)
# Initialize KPIs with currentValue = 0
accumulationState.kpis = [{**kpi, "currentValue": 0} for kpi in kpiDefinitions]
logger.info(f"Defined {len(accumulationState.kpis)} KPIs: {[kpi.get('id') for kpi in accumulationState.kpis]}")
# Extract and validate KPIs (if in accumulation mode with KPIs defined)
if accumulationState and accumulationState.isAccumulationMode and accumulationState.kpis and parsedResult:
updatedKpis = JsonResponseHandler.extractKpiValuesFromJson(
parsedResult,
accumulationState.kpis
)
if updatedKpis:
shouldProceed, reason = JsonResponseHandler.validateKpiProgression(
accumulationState,
updatedKpis
)
if not shouldProceed:
logger.warning(f"Iteration {iteration}: KPI validation failed: {reason}")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, False)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.9, f"KPI validation failed: {reason} ({iteration} iterations)")
break
# Update KPIs in accumulation state
accumulationState.kpis = updatedKpis
logger.info(f"Iteration {iteration}: KPIs updated: {[(kpi.get('id'), kpi.get('currentValue')) for kpi in updatedKpis]}")
# Check if all KPIs completed
allCompleted = True
for kpi in updatedKpis:
targetValue = kpi.get("targetValue", 0)
currentValue = kpi.get("currentValue", 0)
if currentValue < targetValue:
allCompleted = False
break
if allCompleted:
logger.info(f"Iteration {iteration}: All KPIs completed, finishing accumulation")
wasJsonComplete = True # Mark as complete to exit loop
# CRITICAL: Handle JSON fragments (continuation content)
# Fragment merging happens inside _extractSectionsFromResponse
# If merge fails (returns wasJsonComplete=True), stop iterations and complete JSON
if not extractedSections and allSections:
if wasJsonComplete:
# Merge failed - stop iterations, complete JSON with available data
logger.error(f"Iteration {iteration}: ❌ MERGE FAILED - Stopping iterations, completing JSON with available data")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, False)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.9, f"Merge failed, completing JSON ({iteration} iterations)")
break
# Fragment was detected and merged successfully
logger.info(f"Iteration {iteration}: JSON fragment detected and merged, continuing")
# Don't break - fragment was merged, continue to get more content if needed
# Check if we should continue based on JSON completeness
shouldContinue = self._shouldContinueGeneration(
allSections,
iteration,
wasJsonComplete,
result
)
if shouldContinue:
if iterationOperationId:
self.services.chat.progressLogUpdate(iterationOperationId, 0.8, "Fragment merged, continuing")
self.services.chat.progressLogFinish(iterationOperationId, True)
continue
else:
# Done - fragment was merged and JSON is complete
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, fragment merged)")
logger.info(f"Generation complete after {iteration} iterations: fragment merged")
break
# Extract document metadata from first iteration if available
if iteration == 1 and parsedResult and not documentMetadata:
documentMetadata = self._extractDocumentMetadata(parsedResult)
# Update progress after parsing
if iterationOperationId:
if extractedSections:
self.services.chat.progressLogUpdate(iterationOperationId, 0.8, f"Extracted {len(extractedSections)} sections")
if not extractedSections:
# CRITICAL: If JSON was incomplete/broken, continue even if no sections extracted
# This allows the AI to retry and complete the broken JSON
if not wasJsonComplete:
logger.warning(f"Iteration {iteration}: No sections extracted from broken JSON, continuing for another attempt")
continue
# If JSON was complete but no sections extracted - check if it was a fragment
# Fragments are handled above, so if we get here and it's complete, it's an error
logger.warning(f"Iteration {iteration}: No sections extracted from complete JSON, stopping")
break
# Merge new sections with existing sections intelligently
# This handles the STANDARD CASE: broken JSON iterations must be merged together
# The break can occur anywhere - in any section, at any depth
allSections = JsonResponseHandler.mergeSectionsIntelligently(allSections, extractedSections, iteration)
# Log merged sections for debugging
merged_json_str = json.dumps(allSections, indent=2, ensure_ascii=False)
self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# Check if we should continue (completion detection)
# Simple logic: JSON completeness determines continuation
shouldContinue = self._shouldContinueGeneration(
allSections,
iteration,
wasJsonComplete,
result
)
if shouldContinue:
# Finish iteration operation (will continue with next iteration)
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
continue
else:
# Done - finish iteration and update main operation
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, {len(allSections)} sections)")
logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections")
break
except Exception as e:
logger.error(f"Error in AI call iteration {iteration}: {str(e)}")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, False)
break
if iteration >= maxIterations:
logger.warning(f"AI call stopped after maximum iterations ({maxIterations})")
# CRITICAL: Complete any incomplete structures in sections before building final result
# This ensures JSON is properly closed even if merge failed or iterations stopped early
allSections = JsonResponseHandler.completeIncompleteStructures(allSections)
# Build final result from accumulated sections
final_result = self._buildFinalResultFromSections(allSections, documentMetadata)
# Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
return final_result
# JSON merging logic moved to subJsonResponseHandling.py
async def _defineKpisFromPrompt(
self,
userPrompt: str,
parsedJson: Optional[Dict[str, Any]],
continuationContext: Dict[str, Any],
debugPrefix: str = "kpi"
) -> List[Dict[str, Any]]:
"""
Make separate AI call to define KPIs based on user prompt and delivered data.
Args:
userPrompt: Original user prompt
parsedJson: Parsed JSON from first iteration (if available)
continuationContext: Continuation context with delivered summary
Returns:
List of KPI definitions: [{"id": str, "description": str, "jsonPath": str, "targetValue": int}, ...]
"""
deliveredSummary = continuationContext.get("delivered_summary", "")
cutOffElement = continuationContext.get("cut_off_element")
elementBeforeCutoff = continuationContext.get("element_before_cutoff")
# Build prompt for KPI definition
kpiDefinitionPrompt = f"""Analyze the user request and delivered data to define KPIs (Key Performance Indicators) for tracking progress.
User Request:
{userPrompt}
Delivered Data Summary:
{deliveredSummary}
Current JSON Structure (if available):
{json.dumps(parsedJson, indent=2) if parsedJson else "Not available"}
Cut-off Element:
{cutOffElement if cutOffElement else "Not available"}
Last Complete Element:
{elementBeforeCutoff if elementBeforeCutoff else "Not available"}
Task: Define which JSON items should be tracked to measure completion progress.
For each trackable item, provide:
- id: Unique identifier (use descriptive name)
- description: What this KPI measures
- jsonPath: Path to extract value from JSON (use dot notation with array indices, e.g., "sections[0].elements[0].items")
- targetValue: Target value to reach (integer)
Return ONLY valid JSON in this format:
{{
"kpis": [
{{
"id": "unique_id",
"description": "Description of what is measured",
"jsonPath": "path.to.value",
"targetValue": 0
}}
]
}}
If no trackable items can be identified, return: {{"kpis": []}}
"""
try:
request = AiCallRequest(
prompt=kpiDefinitionPrompt,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC
)
)
# Write KPI definition prompt to debug file
self.services.utils.writeDebugFile(kpiDefinitionPrompt, f"{debugPrefix}_kpi_definition_prompt")
response = await self.aiObjects.call(request)
# Write KPI definition response to debug file
self.services.utils.writeDebugFile(response.content, f"{debugPrefix}_kpi_definition_response")
# Parse response
extracted = extractJsonString(response.content)
kpiResponse = json.loads(extracted)
kpiDefinitions = kpiResponse.get("kpis", [])
logger.info(f"Defined {len(kpiDefinitions)} KPIs for tracking")
return kpiDefinitions
except Exception as e:
logger.warning(f"Failed to define KPIs: {e}, continuing without KPI tracking")
return []
def _extractSectionsFromResponse(
self,
result: str,
iteration: int,
debugPrefix: str,
allSections: List[Dict[str, Any]] = None,
accumulationState: Optional[JsonAccumulationState] = None
) -> Tuple[List[Dict[str, Any]], bool, Optional[Dict[str, Any]], Optional[JsonAccumulationState]]:
"""
Extract sections from AI response, handling both valid and broken JSON.
NEW BEHAVIOR:
- First iteration: Check if complete, if not start accumulation
- Subsequent iterations: Accumulate strings, parse when complete
Returns:
Tuple of:
- sections: Extracted sections
- wasJsonComplete: True if JSON is complete
- parsedResult: Parsed JSON object
- updatedAccumulationState: Updated accumulation state (None if not in accumulation mode)
"""
if allSections is None:
allSections = []
if iteration == 1:
# First iteration - check if complete
parsed = None
try:
extracted = extractJsonString(result)
parsed = json.loads(extracted)
# Check completeness
if JsonResponseHandler.isJsonComplete(parsed):
# Complete JSON - no accumulation needed
sections = extractSectionsFromDocument(parsed)
logger.info(f"Iteration 1: Complete JSON detected, no accumulation needed")
return sections, True, parsed, None # No accumulation
except Exception:
pass
# Incomplete - try to extract partial sections from broken JSON
logger.info(f"Iteration 1: Incomplete JSON detected, attempting to extract partial sections")
partialSections = []
if parsed:
# Try to extract sections from parsed (even if incomplete)
partialSections = extractSectionsFromDocument(parsed)
else:
# Try to repair broken JSON and extract sections
try:
repaired = repairBrokenJson(result)
if repaired:
partialSections = extractSectionsFromDocument(repaired)
parsed = repaired # Use repaired version for accumulation state
except Exception:
pass # If repair fails, continue with empty sections
# Define KPIs (async call - need to handle this)
# For now, create accumulation state without KPIs, will be updated after async call
accumulationState = JsonAccumulationState(
accumulatedJsonString=result,
isAccumulationMode=True,
lastParsedResult=parsed,
allSections=partialSections,
kpis=[]
)
# Note: KPI definition will be done in the caller (async context)
return partialSections, False, parsed, accumulationState
else:
# Subsequent iterations - accumulate
if accumulationState and accumulationState.isAccumulationMode:
accumulated, sections, isComplete, parsedResult = \
JsonResponseHandler.accumulateAndParseJsonFragments(
accumulationState.accumulatedJsonString,
result,
allSections,
iteration
)
# Update accumulation state
accumulationState.accumulatedJsonString = accumulated
accumulationState.lastParsedResult = parsedResult
accumulationState.allSections = allSections + sections if sections else allSections
accumulationState.isAccumulationMode = not isComplete
# Log accumulated JSON for debugging
if parsedResult:
accumulated_json_str = json.dumps(parsedResult, indent=2, ensure_ascii=False)
self.services.utils.writeDebugFile(accumulated_json_str, f"{debugPrefix}_accumulated_json_iteration_{iteration}.json")
return sections, isComplete, parsedResult, accumulationState
else:
# No accumulation mode - process normally (shouldn't happen)
logger.warning(f"Iteration {iteration}: No accumulation state but iteration > 1")
return [], False, None, None
def _shouldContinueGeneration(
self,
allSections: List[Dict[str, Any]],
iteration: int,
wasJsonComplete: bool,
rawResponse: str = None
) -> bool:
"""
Determine if AI generation loop should continue.
CRITICAL: This is ONLY about AI Loop Completion, NOT Action DoD!
Action DoD is checked AFTER the AI Loop completes in _refineDecide.
Simple logic:
- If JSON parsing failed or incomplete → continue (needs more content)
- If JSON parses successfully and is complete → stop (all content delivered)
- Loop detection prevents infinite loops
CRITICAL: JSON completeness is determined by parsing, NOT by last character check!
Returns True if we should continue, False if AI Loop is done.
"""
if len(allSections) == 0:
return True # No sections yet, continue
# CRITERION 1: If JSON was incomplete/broken (parsing failed or incomplete) - continue to repair/complete
if not wasJsonComplete:
logger.info(f"Iteration {iteration}: JSON incomplete/broken - continuing to complete")
return True
# CRITERION 2: JSON is complete (parsed successfully) - check for loop detection
if self._isStuckInLoop(allSections, iteration):
logger.warning(f"Iteration {iteration}: Detected potential infinite loop - stopping AI loop")
return False
# JSON is complete and not stuck in loop - done
logger.info(f"Iteration {iteration}: JSON complete - AI loop done")
return False
def _isStuckInLoop(
self,
allSections: List[Dict[str, Any]],
iteration: int
) -> bool:
"""
Detect if we're stuck in a loop (same content being repeated).
Generic approach: Check if recent iterations are adding minimal or duplicate content.
"""
if iteration < 3:
return False # Need at least 3 iterations to detect a loop
if len(allSections) == 0:
return False
# Check if last section is very small (might be stuck)
lastSection = allSections[-1]
elements = lastSection.get("elements", [])
if isinstance(elements, list) and elements:
lastElem = elements[-1] if elements else {}
else:
lastElem = elements if isinstance(elements, dict) else {}
# Check content size of last section
lastSectionSize = 0
if isinstance(lastElem, dict):
for key, value in lastElem.items():
if isinstance(value, str):
lastSectionSize += len(value)
elif isinstance(value, list):
lastSectionSize += len(str(value))
# If last section is very small and we've done many iterations, might be stuck
if lastSectionSize < 100 and iteration > 10:
logger.warning(f"Potential loop detected: iteration {iteration}, last section size {lastSectionSize}")
return True
return False
def _extractDocumentMetadata(
self,
parsedResult: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Extract document metadata (title, filename) from parsed AI response.
Returns dict with 'title' and 'filename' keys if found, None otherwise.
"""
if not isinstance(parsedResult, dict):
return None
# Try to get from documents array (preferred structure)
if "documents" in parsedResult and isinstance(parsedResult["documents"], list) and len(parsedResult["documents"]) > 0:
firstDoc = parsedResult["documents"][0]
if isinstance(firstDoc, dict):
title = firstDoc.get("title")
filename = firstDoc.get("filename")
if title or filename:
return {
"title": title,
"filename": filename
}
return None
def _buildFinalResultFromSections(
self,
allSections: List[Dict[str, Any]],
documentMetadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Build final JSON result from accumulated sections.
Uses AI-provided metadata (title, filename) if available.
"""
if not allSections:
return ""
# Extract metadata from AI response if available
title = "Generated Document"
filename = "document.json"
if documentMetadata:
if documentMetadata.get("title"):
title = documentMetadata["title"]
if documentMetadata.get("filename"):
filename = documentMetadata["filename"]
# Build documents structure
# Assuming single document for now
documents = [{
"id": "doc_1",
"title": title,
"filename": filename,
"sections": allSections
}]
result = {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": documents
}
return json.dumps(result, indent=2)
# Public API Methods
# Planning AI Call
async def callAiPlanning(
self,
prompt: str,
placeholders: Optional[List[PromptPlaceholder]] = None,
debugType: Optional[str] = None
) -> str:
"""
Planning AI call for task planning, action planning, action selection, etc.
Always uses static parameters optimized for planning tasks.
Args:
prompt: The planning prompt
placeholders: Optional list of placeholder replacements
debugType: Optional debug file type identifier (e.g., 'taskplan', 'dynamic', 'intentanalysis')
If not provided, defaults to 'plan'
Returns:
Planning JSON response
"""
await self.ensureAiObjectsInitialized()
# Planning calls always use static parameters
options = AiCallOptions(
operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
compressPrompt=False,
compressContext=False
)
# Build full prompt with placeholders
if placeholders:
placeholdersDict = {p.label: p.content for p in placeholders}
fullPrompt = self._buildPromptWithPlaceholders(prompt, placeholdersDict)
else:
fullPrompt = prompt
# Root-cause fix: planning must return raw single-shot JSON, not section-based output
request = AiCallRequest(
prompt=fullPrompt,
context="",
options=options
)
# Debug: persist prompt/response for analysis with context-specific naming
debugPrefix = debugType if debugType else "plan"
self.services.utils.writeDebugFile(fullPrompt, f"{debugPrefix}_prompt")
response = await self.aiObjects.call(request)
result = response.content or ""
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
return result
async def callAiContent(
self,
prompt: str,
options: AiCallOptions,
contentParts: Optional[List[ContentPart]] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None,
parentOperationId: Optional[str] = None # Parent operation ID for hierarchical logging
) -> AiResponse:
"""
Unified AI content processing method (replaces callAiDocuments and callAiText).
Args:
prompt: The main prompt for the AI call
contentParts: Optional list of already-extracted content parts (preferred)
options: AI call configuration options (REQUIRED - operationType must be set)
outputFormat: Optional output format for document generation (e.g., 'pdf', 'docx', 'xlsx')
title: Optional title for generated documents
parentOperationId: Optional parent operation ID for hierarchical logging
Returns:
AiResponse with content, metadata, and optional documents
"""
await self.ensureAiObjectsInitialized()
# Create separate operationId for detailed progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
aiOperationId = f"ai_content_{workflowId}_{int(time.time())}"
# Get parent log ID if parent operation exists
parentLogId = None
if parentOperationId:
parentLogId = self.services.chat.getOperationLogId(parentOperationId)
# Start progress tracking with parent reference
self.services.chat.progressLogStart(
aiOperationId,
"AI content processing",
"Content Processing",
f"Format: {outputFormat or 'text'}",
parentId=parentLogId
)
try:
# Extraction is now separate - contentParts must be extracted before calling
# Require operationType to be set before calling
opType = getattr(options, "operationType", None)
if not opType:
# If outputFormat is specified, default to DATA_GENERATE
if outputFormat:
options.operationType = OperationTypeEnum.DATA_GENERATE
opType = OperationTypeEnum.DATA_GENERATE
else:
self.services.chat.progressLogUpdate(aiOperationId, 0.1, "Analyzing prompt parameters")
analyzedOptions = await self._analyzePromptAndCreateOptions(prompt)
if analyzedOptions and hasattr(analyzedOptions, "operationType") and analyzedOptions.operationType:
options.operationType = analyzedOptions.operationType
# Merge other analyzed options
if hasattr(analyzedOptions, "priority"):
options.priority = analyzedOptions.priority
if hasattr(analyzedOptions, "processingMode"):
options.processingMode = analyzedOptions.processingMode
if hasattr(analyzedOptions, "compressPrompt"):
options.compressPrompt = analyzedOptions.compressPrompt
if hasattr(analyzedOptions, "compressContext"):
options.compressContext = analyzedOptions.compressContext
else:
# Default to DATA_ANALYSE if analysis fails
options.operationType = OperationTypeEnum.DATA_ANALYSE
opType = options.operationType
# Handle IMAGE_GENERATE operations
if opType == OperationTypeEnum.IMAGE_GENERATE:
self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation")
request = AiCallRequest(
prompt=prompt,
context="",
options=options
)
response = await self.aiObjects.call(request)
if response.content:
# Build document data for image
imageDoc = DocumentData(
documentName="generated_image.png",
documentData=response.content,
mimeType="image/png"
)
metadata = AiResponseMetadata(
title=title or "Generated Image",
operationType=opType.value
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
"ai.generate.image"
)
self.services.chat.progressLogUpdate(aiOperationId, 0.9, "Image generated")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=response.content,
metadata=metadata,
documents=[imageDoc]
)
else:
errorMsg = f"No image data returned: {response.content}"
logger.error(f"Error in AI image generation: {errorMsg}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(errorMsg)
# Handle WEB_SEARCH and WEB_CRAWL operations
if opType == OperationTypeEnum.WEB_SEARCH or opType == OperationTypeEnum.WEB_CRAWL:
self.services.chat.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}")
request = AiCallRequest(
prompt=prompt, # Raw JSON prompt - connector will parse it
context="",
options=options
)
response = await self.aiObjects.call(request)
if response.content:
metadata = AiResponseMetadata(
operationType=opType.value
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
f"ai.{opType.name.lower()}"
)
self.services.chat.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=response.content,
metadata=metadata
)
else:
errorMsg = f"No content returned from {opType.name}: {response.content}"
logger.error(f"Error in {opType.name}: {errorMsg}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(errorMsg)
# Handle document generation (outputFormat specified)
if outputFormat:
# CRITICAL: For document generation with JSON templates, NEVER compress the prompt
options.compressPrompt = False
options.compressContext = False
# Convert contentParts to text for generation prompt (if provided)
if contentParts:
# Convert contentParts to text for generation prompt
content_for_generation = "\n\n".join([f"[{part.label}]\n{part.data}" for part in contentParts if part.data])
else:
content_for_generation = None
self.services.chat.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt")
from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
generation_prompt = await buildGenerationPrompt(
outputFormat, prompt, title, content_for_generation, None
)
promptArgs = {
"outputFormat": outputFormat,
"userPrompt": prompt,
"title": title,
"extracted_content": content_for_generation
}
self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation")
# Extract user prompt from promptArgs for task completion analysis
userPrompt = None
if promptArgs:
userPrompt = promptArgs.get("userPrompt") or promptArgs.get("user_prompt")
generated_json = await self._callAiWithLooping(
generation_prompt,
options,
"document_generation",
buildGenerationPrompt,
promptArgs,
aiOperationId,
userPrompt=userPrompt
)
self.services.chat.progressLogUpdate(aiOperationId, 0.7, "Parsing generated JSON")
try:
extracted_json = self.services.utils.jsonExtractString(generated_json)
generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse generated JSON: {str(e)}")
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(f"Generated content is not valid JSON: {str(e)}")
# Extract title and filename from generated document structure
extractedTitle = title
extractedFilename = None
if isinstance(generated_data, dict) and "documents" in generated_data:
docs = generated_data["documents"]
if isinstance(docs, list) and len(docs) > 0:
firstDoc = docs[0]
if isinstance(firstDoc, dict):
if firstDoc.get("title"):
extractedTitle = firstDoc["title"]
if firstDoc.get("filename"):
extractedFilename = firstDoc["filename"]
# Ensure metadata contains the extracted title
if "metadata" not in generated_data:
generated_data["metadata"] = {}
if extractedTitle:
generated_data["metadata"]["title"] = extractedTitle
# Create separate operation for content rendering
renderOperationId = f"{aiOperationId}_render"
renderParentLogId = self.services.chat.getOperationLogId(aiOperationId)
self.services.chat.progressLogStart(
renderOperationId,
"Content Rendering",
"Rendering",
f"Format: {outputFormat}",
parentId=renderParentLogId
)
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
self.services.chat.progressLogUpdate(renderOperationId, 0.5, f"Rendering to {outputFormat} format")
rendered_content, mime_type = await generationService.renderReport(
generated_data, outputFormat, extractedTitle or "Generated Document", prompt, self
)
self.services.chat.progressLogFinish(renderOperationId, True)
# Determine document name
if extractedFilename:
documentName = extractedFilename
elif extractedTitle and extractedTitle != "Generated Document":
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", extractedTitle)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
if sanitized:
if not sanitized.lower().endswith(f".{outputFormat}"):
documentName = f"{sanitized}.{outputFormat}"
else:
documentName = sanitized
else:
documentName = f"generated.{outputFormat}"
else:
documentName = f"generated.{outputFormat}"
# Build document data
docData = DocumentData(
documentName=documentName,
documentData=rendered_content,
mimeType=mime_type,
sourceJson=generated_data # Preserve source JSON for structure validation
)
metadata = AiResponseMetadata(
title=extractedTitle or title or "Generated Document",
filename=extractedFilename,
operationType=opType.value if opType else None
)
self.services.utils.writeDebugFile(str(generated_data), "document_generation_response")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=json.dumps(generated_data),
metadata=metadata,
documents=[docData]
)
except Exception as e:
logger.error(f"Error rendering document: {str(e)}")
if renderOperationId:
self.services.chat.progressLogFinish(renderOperationId, False)
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(f"Rendering failed: {str(e)}")
# Handle text processing (no outputFormat)
self.services.chat.progressLogUpdate(aiOperationId, 0.5, "Processing text call")
if contentParts:
# Process contentParts through AI
# Convert contentParts to text for prompt
contentText = "\n\n".join([f"[{part.label}]\n{part.data}" for part in contentParts if part.data])
fullPrompt = f"{prompt}\n\n{contentText}" if contentText else prompt
result_content = await self._callAiWithLooping(
fullPrompt, options, "text", None, None, aiOperationId
)
else:
# Direct text call (no documents to process)
result_content = await self._callAiWithLooping(
prompt, options, "text", None, None, aiOperationId
)
metadata = AiResponseMetadata(
operationType=opType.value if opType else None
)
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=result_content,
metadata=metadata
)
except Exception as e:
logger.error(f"Error in callAiContent: {str(e)}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise