gateway/modules/services/serviceAi/mainServiceAi.py
2025-12-16 00:27:33 +01:00

1351 lines
66 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import json
import logging
import re
import time
from typing import Dict, Any, List, Optional, Tuple
from modules.datamodels.datamodelChat import PromptPlaceholder
from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData
from modules.interfaces.interfaceAiObjects import AiObjects
from modules.shared.jsonUtils import (
extractJsonString,
repairBrokenJson,
extractSectionsFromDocument,
buildContinuationContext,
parseJsonWithModel
)
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
from modules.datamodels.datamodelAi import JsonAccumulationState
logger = logging.getLogger(__name__)
# Rebuild the model to resolve forward references
AiCallRequest.model_rebuild()
class AiService:
"""AI service with core operations integrated."""
def __init__(self, serviceCenter=None) -> None:
"""Initialize AI service with service center access.
Args:
serviceCenter: Service center instance for accessing other services
"""
self.services = serviceCenter
# Only depend on interfaces
self.aiObjects = None # Will be initialized in create() or ensureAiObjectsInitialized()
# Submodules initialized as None - will be set in _initializeSubmodules() after aiObjects is ready
self.extractionService = None
def _initializeSubmodules(self):
"""Initialize all submodules after aiObjects is ready."""
if self.aiObjects is None:
raise RuntimeError("aiObjects must be initialized before initializing submodules")
if self.extractionService is None:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
async def callAi(self, request: AiCallRequest, progressCallback=None):
"""Router: handles content parts via extractionService, text context via interface.
Replaces direct calls to self.aiObjects.call() to route content parts processing
through serviceExtraction layer.
"""
if hasattr(request, 'contentParts') and request.contentParts:
return await self.extractionService.processContentPartsWithAi(
request, self.aiObjects, progressCallback
)
return await self.aiObjects.callWithTextContext(request)
async def ensureAiObjectsInitialized(self):
"""Ensure aiObjects is initialized and submodules are ready."""
if self.aiObjects is None:
logger.info("Lazy initializing AiObjects...")
self.aiObjects = await AiObjects.create()
logger.info("AiObjects initialization completed")
# Initialize submodules after aiObjects is ready
self._initializeSubmodules()
@classmethod
async def create(cls, serviceCenter=None) -> "AiService":
"""Create AiService instance with all connectors and submodules initialized."""
logger.info("AiService.create() called")
instance = cls(serviceCenter)
logger.info("AiService created, about to call AiObjects.create()...")
instance.aiObjects = await AiObjects.create()
logger.info("AiObjects.create() completed")
# Initialize all submodules after aiObjects is ready
instance._initializeSubmodules()
logger.info("AiService submodules initialized")
return instance
# Helper methods
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
"""
Build full prompt by replacing placeholders with their content.
Uses the new {{KEY:placeholder}} format.
Args:
prompt: The base prompt template
placeholders: Dictionary of placeholder key-value pairs
Returns:
Prompt with placeholders replaced
"""
if not placeholders:
return prompt
full_prompt = prompt
for placeholder, content in placeholders.items():
# Skip if content is None or empty
if content is None:
continue
# Replace {{KEY:placeholder}}
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", str(content))
return full_prompt
async def _analyzePromptAndCreateOptions(self, prompt: str) -> AiCallOptions:
"""Analyze prompt to determine appropriate AiCallOptions parameters."""
try:
# Get dynamic enum values from Pydantic models
operationTypes = [e.value for e in OperationTypeEnum]
priorities = [e.value for e in PriorityEnum]
processingModes = [e.value for e in ProcessingModeEnum]
# Create analysis prompt for AI to determine operation type and parameters
analysisPrompt = f"""
You are an AI operation analyzer. Analyze the following prompt and determine the most appropriate operation type and parameters.
PROMPT TO ANALYZE:
{self.services.utils.sanitizePromptContent(prompt, 'userinput')}
Based on the prompt content, determine:
1. operationType: Choose the most appropriate from: {', '.join(operationTypes)}
2. priority: Choose from: {', '.join(priorities)}
3. processingMode: Choose from: {', '.join(processingModes)}
4. compressPrompt: true/false (true for story-like prompts, false for structured prompts with JSON/schemas)
5. compressContext: true/false (true to summarize context, false to process fully)
Respond with ONLY a JSON object in this exact format:
{{
"operationType": "dataAnalyse",
"priority": "balanced",
"processingMode": "basic",
"compressPrompt": true,
"compressContext": true
}}
"""
# Use AI to analyze the prompt
request = AiCallRequest(
prompt=analysisPrompt,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
compressPrompt=True,
compressContext=False
)
)
response = await self.callAi(request)
# Parse AI response using structured parsing with AiCallOptions model
try:
# Use parseJsonWithModel to parse response into AiCallOptions (handles enum conversion automatically)
analysis = parseJsonWithModel(response.content, AiCallOptions)
return analysis
except Exception as e:
logger.warning(f"Failed to parse AI analysis response: {e}")
except Exception as e:
logger.warning(f"Prompt analysis failed: {e}")
# Fallback to default options
return AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.BASIC
)
async def _callAiWithLooping(
self,
prompt: str,
options: AiCallOptions,
debugPrefix: str = "ai_call",
promptBuilder: Optional[callable] = None,
promptArgs: Optional[Dict[str, Any]] = None,
operationId: Optional[str] = None,
userPrompt: Optional[str] = None
) -> str:
"""
Shared core function for AI calls with repair-based looping system.
Automatically repairs broken JSON and continues generation seamlessly.
Args:
prompt: The prompt to send to AI
options: AI call configuration options
debugPrefix: Prefix for debug file names
promptBuilder: Optional function to rebuild prompts for continuation
promptArgs: Optional arguments for prompt builder
operationId: Optional operation ID for progress tracking
Returns:
Complete AI response after all iterations
"""
maxIterations = 50 # Prevent infinite loops
iteration = 0
allSections = [] # Accumulate all sections across iterations
lastRawResponse = None # Store last raw JSON response for continuation
documentMetadata = None # Store document metadata (title, filename) from first iteration
accumulationState = None # Track accumulation state for string accumulation
# Get parent operation ID for iteration operations (parentId should be operationId, not log entry ID)
parentOperationId = operationId # Use the parent's operationId directly
while iteration < maxIterations:
iteration += 1
# Create separate operation for each iteration with parent reference
iterationOperationId = None
if operationId:
iterationOperationId = f"{operationId}_iter_{iteration}"
self.services.chat.progressLogStart(
iterationOperationId,
"AI Call",
f"Iteration {iteration}",
"",
parentOperationId=parentOperationId
)
# Build iteration prompt
# CRITICAL: Build continuation prompt if we have sections OR if we have a previous response (even if broken)
# This ensures continuation prompts are built even when JSON is so broken that no sections can be extracted
if (len(allSections) > 0 or lastRawResponse) and promptBuilder and promptArgs:
# This is a continuation - build continuation context with raw JSON and rebuild prompt
continuationContext = buildContinuationContext(allSections, lastRawResponse)
if not lastRawResponse:
logger.warning(f"Iteration {iteration}: No previous response available for continuation!")
# Filter promptArgs to only include parameters that buildGenerationPrompt accepts
# buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext, services
filteredPromptArgs = {
k: v for k, v in promptArgs.items()
if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content', 'services']
}
# Always include services if available
if not filteredPromptArgs.get('services') and hasattr(self, 'services'):
filteredPromptArgs['services'] = self.services
# Rebuild prompt with continuation context using the provided prompt builder
iterationPrompt = await promptBuilder(**filteredPromptArgs, continuationContext=continuationContext)
else:
# First iteration - use original prompt
iterationPrompt = prompt
# Make AI call
try:
if iterationOperationId:
self.services.chat.progressLogUpdate(iterationOperationId, 0.3, "Calling AI model")
request = AiCallRequest(
prompt=iterationPrompt,
context="",
options=options
)
# Write the ACTUAL prompt sent to AI
if iteration == 1:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
else:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.callAi(request)
result = response.content
# Track bytes for progress reporting
bytesReceived = len(result.encode('utf-8')) if result else 0
totalBytesSoFar = sum(len(section.get('content', '').encode('utf-8')) if isinstance(section.get('content'), str) else 0 for section in allSections) + bytesReceived
# Update progress after AI call with byte information
if iterationOperationId:
# Format bytes for display (kB or MB)
if totalBytesSoFar < 1024:
bytesDisplay = f"{totalBytesSoFar}B"
elif totalBytesSoFar < 1024 * 1024:
bytesDisplay = f"{totalBytesSoFar / 1024:.1f}kB"
else:
bytesDisplay = f"{totalBytesSoFar / (1024 * 1024):.1f}MB"
self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})")
# Write raw AI response to debug file
if iteration == 1:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
else:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration (only if workflow exists and has id)
if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id:
try:
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
f"ai.call.{debugPrefix}.iteration_{iteration}"
)
except Exception as statError:
# Don't break the main loop if stat storage fails
logger.warning(f"Failed to store workflow stat: {str(statError)}")
# Check for error response using generic error detection (errorCount > 0 or modelName == "error")
if hasattr(response, 'errorCount') and response.errorCount > 0:
errorMsg = f"Iteration {iteration}: Error response detected (errorCount={response.errorCount}), stopping loop: {result[:200] if result else 'empty'}"
logger.error(errorMsg)
break
if hasattr(response, 'modelName') and response.modelName == "error":
errorMsg = f"Iteration {iteration}: Error response detected (modelName=error), stopping loop: {result[:200] if result else 'empty'}"
logger.error(errorMsg)
break
if not result or not result.strip():
logger.warning(f"Iteration {iteration}: Empty response, stopping")
break
# Check if this is a text response (not document generation)
# Text responses don't need JSON parsing - return immediately after first successful response
isTextResponse = (promptBuilder is None and promptArgs is None) or debugPrefix == "text"
if isTextResponse:
# For text responses, return the text immediately - no JSON parsing needed
logger.info(f"Iteration {iteration}: Text response received, returning immediately")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
return result
# Store raw response for continuation (even if broken)
lastRawResponse = result
# Extract sections from response (handles both valid and broken JSON)
# Only for document generation (JSON responses)
# CRITICAL: Pass allSections and accumulationState to enable string accumulation
extractedSections, wasJsonComplete, parsedResult, accumulationState = self._extractSectionsFromResponse(
result, iteration, debugPrefix, allSections, accumulationState
)
# CRITICAL: Merge sections BEFORE KPI validation
# This ensures sections are preserved even if KPI validation fails
if extractedSections:
allSections = JsonResponseHandler.mergeSectionsIntelligently(allSections, extractedSections, iteration)
# Define KPIs if we just entered accumulation mode (iteration 1, incomplete JSON)
if accumulationState and accumulationState.isAccumulationMode and iteration == 1 and not accumulationState.kpis:
logger.info(f"Iteration {iteration}: Defining KPIs for accumulation tracking")
continuationContext = buildContinuationContext(allSections, result)
# Pass raw response string from first iteration for KPI definition
kpiDefinitions = await self._defineKpisFromPrompt(
userPrompt or prompt,
result, # Pass raw JSON string from first iteration
continuationContext,
debugPrefix
)
# Initialize KPIs with currentValue = 0
accumulationState.kpis = [{**kpi, "currentValue": 0} for kpi in kpiDefinitions]
logger.info(f"Defined {len(accumulationState.kpis)} KPIs: {[kpi.get('id') for kpi in accumulationState.kpis]}")
# Extract and validate KPIs (if in accumulation mode with KPIs defined)
if accumulationState and accumulationState.isAccumulationMode and accumulationState.kpis:
# For KPI extraction, prefer accumulated JSON string over repaired JSON
# because repairBrokenJson may lose data (e.g., empty rows array when JSON is incomplete)
updatedKpis = []
# First try to extract from parsedResult (repaired JSON)
if parsedResult:
try:
updatedKpis = JsonResponseHandler.extractKpiValuesFromJson(
parsedResult,
accumulationState.kpis
)
# Check if we got meaningful values (non-zero)
hasValidValues = any(kpi.get("currentValue", 0) > 0 for kpi in updatedKpis)
if not hasValidValues and accumulationState.accumulatedJsonString:
# Repaired JSON has empty values, try accumulated string
logger.debug("Repaired JSON has empty KPI values, trying accumulated JSON string")
updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson(
accumulationState.accumulatedJsonString,
accumulationState.kpis
)
except Exception as e:
logger.debug(f"Error extracting KPIs from parsedResult: {e}")
updatedKpis = []
# If no parsedResult or extraction failed, try accumulated string
if not updatedKpis and accumulationState.accumulatedJsonString:
try:
updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson(
accumulationState.accumulatedJsonString,
accumulationState.kpis
)
except Exception as e:
logger.debug(f"Error extracting KPIs from accumulated JSON string: {e}")
updatedKpis = []
if updatedKpis:
shouldProceed, reason = JsonResponseHandler.validateKpiProgression(
accumulationState,
updatedKpis
)
if not shouldProceed:
logger.warning(f"Iteration {iteration}: KPI validation failed: {reason}")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, False)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.9, f"KPI validation failed: {reason} ({iteration} iterations)")
break
# Update KPIs in accumulation state
accumulationState.kpis = updatedKpis
logger.info(f"Iteration {iteration}: KPIs updated: {[(kpi.get('id'), kpi.get('currentValue')) for kpi in updatedKpis]}")
# Check if all KPIs completed
allCompleted = True
for kpi in updatedKpis:
targetValue = kpi.get("targetValue", 0)
currentValue = kpi.get("currentValue", 0)
if currentValue < targetValue:
allCompleted = False
break
if allCompleted:
logger.info(f"Iteration {iteration}: All KPIs completed, finishing accumulation")
wasJsonComplete = True # Mark as complete to exit loop
# CRITICAL: Handle JSON fragments (continuation content)
# Fragment merging happens inside _extractSectionsFromResponse
# If merge fails (returns wasJsonComplete=True), stop iterations and complete JSON
if not extractedSections and allSections:
if wasJsonComplete:
# Merge failed - stop iterations, complete JSON with available data
logger.error(f"Iteration {iteration}: ❌ MERGE FAILED - Stopping iterations, completing JSON with available data")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, False)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.9, f"Merge failed, completing JSON ({iteration} iterations)")
break
# Fragment was detected and merged successfully
logger.info(f"Iteration {iteration}: JSON fragment detected and merged, continuing")
# Don't break - fragment was merged, continue to get more content if needed
# Check if we should continue based on JSON completeness
shouldContinue = self._shouldContinueGeneration(
allSections,
iteration,
wasJsonComplete,
result
)
if shouldContinue:
if iterationOperationId:
self.services.chat.progressLogUpdate(iterationOperationId, 0.8, "Fragment merged, continuing")
self.services.chat.progressLogFinish(iterationOperationId, True)
continue
else:
# Done - fragment was merged and JSON is complete
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, fragment merged)")
logger.info(f"Generation complete after {iteration} iterations: fragment merged")
break
# Extract document metadata from first iteration if available
if iteration == 1 and parsedResult and not documentMetadata:
documentMetadata = self._extractDocumentMetadata(parsedResult)
# Update progress after parsing
if iterationOperationId:
if extractedSections:
self.services.chat.progressLogUpdate(iterationOperationId, 0.8, f"Extracted {len(extractedSections)} sections")
if not extractedSections:
# CRITICAL: If JSON was incomplete/broken, continue even if no sections extracted
# This allows the AI to retry and complete the broken JSON
if not wasJsonComplete:
logger.warning(f"Iteration {iteration}: No sections extracted from broken JSON, continuing for another attempt")
continue
# If JSON was complete but no sections extracted - check if it was a fragment
# Fragments are handled above, so if we get here and it's complete, it's an error
logger.warning(f"Iteration {iteration}: No sections extracted from complete JSON, stopping")
break
# NOTE: Section merging now happens BEFORE KPI validation (see above)
# This ensures sections are preserved even if KPI validation fails
# Calculate total bytes in merged content for progress display
merged_json_str = json.dumps(allSections, indent=2, ensure_ascii=False)
totalBytesGenerated = len(merged_json_str.encode('utf-8'))
# Update main operation with byte progress
if operationId:
# Format bytes for display
if totalBytesGenerated < 1024:
bytesDisplay = f"{totalBytesGenerated}B"
elif totalBytesGenerated < 1024 * 1024:
bytesDisplay = f"{totalBytesGenerated / 1024:.1f}kB"
else:
bytesDisplay = f"{totalBytesGenerated / (1024 * 1024):.1f}MB"
# Estimate progress based on iterations (rough estimate)
estimatedProgress = min(0.9, 0.4 + (iteration * 0.1))
self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})")
# Log merged sections for debugging
self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# Check if we should continue (completion detection)
# Simple logic: JSON completeness determines continuation
shouldContinue = self._shouldContinueGeneration(
allSections,
iteration,
wasJsonComplete,
result
)
if shouldContinue:
# Finish iteration operation (will continue with next iteration)
if iterationOperationId:
# Show byte progress in iteration completion
iterBytes = len(result.encode('utf-8')) if result else 0
if iterBytes < 1024:
iterBytesDisplay = f"{iterBytes}B"
elif iterBytes < 1024 * 1024:
iterBytesDisplay = f"{iterBytes / 1024:.1f}kB"
else:
iterBytesDisplay = f"{iterBytes / (1024 * 1024):.1f}MB"
self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Completed ({iterBytesDisplay})")
self.services.chat.progressLogFinish(iterationOperationId, True)
continue
else:
# Done - finish iteration and update main operation
if iterationOperationId:
# Show final byte count
finalBytes = len(merged_json_str.encode('utf-8'))
if finalBytes < 1024:
finalBytesDisplay = f"{finalBytes}B"
elif finalBytes < 1024 * 1024:
finalBytesDisplay = f"{finalBytes / 1024:.1f}kB"
else:
finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB"
self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Complete ({finalBytesDisplay})")
self.services.chat.progressLogFinish(iterationOperationId, True)
if operationId:
# Show final size in main operation
finalBytes = len(merged_json_str.encode('utf-8'))
if finalBytes < 1024:
finalBytesDisplay = f"{finalBytes}B"
elif finalBytes < 1024 * 1024:
finalBytesDisplay = f"{finalBytes / 1024:.1f}kB"
else:
finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB"
self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete: {finalBytesDisplay} ({iteration} iterations, {len(allSections)} sections)")
logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections")
break
except Exception as e:
logger.error(f"Error in AI call iteration {iteration}: {str(e)}")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, False)
break
if iteration >= maxIterations:
logger.warning(f"AI call stopped after maximum iterations ({maxIterations})")
# CRITICAL: Complete any incomplete structures in sections before building final result
# This ensures JSON is properly closed even if merge failed or iterations stopped early
allSections = JsonResponseHandler.completeIncompleteStructures(allSections)
# Build final result from accumulated sections
final_result = self._buildFinalResultFromSections(allSections, documentMetadata)
# Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
return final_result
# JSON merging logic moved to subJsonResponseHandling.py
async def _defineKpisFromPrompt(
self,
userPrompt: str,
rawJsonString: Optional[str],
continuationContext: Dict[str, Any],
debugPrefix: str = "kpi"
) -> List[Dict[str, Any]]:
"""
Make separate AI call to define KPIs based on user prompt and incomplete JSON.
Args:
userPrompt: Original user prompt
rawJsonString: Raw JSON string from first iteration response
continuationContext: Continuation context (not used for JSON, kept for compatibility)
Returns:
List of KPI definitions: [{"id": str, "description": str, "jsonPath": str, "targetValue": int}, ...]
"""
# Use raw JSON string from first iteration response
if rawJsonString:
# Remove markdown code fences if present
from modules.shared.jsonUtils import stripCodeFences
incompleteJson = stripCodeFences(rawJsonString.strip())
else:
incompleteJson = "Not available"
kpiDefinitionPrompt = f"""Analyze the user request and incomplete JSON to define KPIs (Key Performance Indicators) for tracking progress.
User Request:
{userPrompt}
Delivered JSON part:
{incompleteJson}
Task: Define which JSON items should be tracked to measure completion progress.
IMPORTANT: Analyze the Delivered JSON part structure to understand what is being tracked:
1. Identify the structure type (table with rows, list with items, etc.)
2. Determine what the jsonPath actually counts (number of rows, number of items, etc.)
3. Calculate targetValue based on what is being tracked, NOT the total quantity requested
For each trackable item, provide:
- id: Unique identifier (use descriptive name)
- description: What this KPI measures (be specific about what is counted)
- jsonPath: Path to extract value from JSON (use dot notation with array indices, e.g., "documents[0].sections[1].elements[0].rows")
- targetValue: Target value to reach (integer) - MUST match what jsonPath actually tracks (rows count, items count, etc.)
Return ONLY valid JSON in this format:
{{
"kpis": [
{{
"id": "unique_id",
"description": "Description of what is measured",
"jsonPath": "path.to.value",
"targetValue": 0
}}
]
}}
If no trackable items can be identified, return: {{"kpis": []}}
"""
try:
request = AiCallRequest(
prompt=kpiDefinitionPrompt,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC
)
)
# Write KPI definition prompt to debug file
self.services.utils.writeDebugFile(kpiDefinitionPrompt, f"{debugPrefix}_kpi_definition_prompt")
response = await self.callAi(request)
# Write KPI definition response to debug file
self.services.utils.writeDebugFile(response.content, f"{debugPrefix}_kpi_definition_response")
# Parse response
extracted = extractJsonString(response.content)
kpiResponse = json.loads(extracted)
kpiDefinitions = kpiResponse.get("kpis", [])
logger.info(f"Defined {len(kpiDefinitions)} KPIs for tracking")
return kpiDefinitions
except Exception as e:
logger.warning(f"Failed to define KPIs: {e}, continuing without KPI tracking")
return []
def _extractSectionsFromResponse(
self,
result: str,
iteration: int,
debugPrefix: str,
allSections: List[Dict[str, Any]] = None,
accumulationState: Optional[JsonAccumulationState] = None
) -> Tuple[List[Dict[str, Any]], bool, Optional[Dict[str, Any]], Optional[JsonAccumulationState]]:
"""
Extract sections from AI response, handling both valid and broken JSON.
NEW BEHAVIOR:
- First iteration: Check if complete, if not start accumulation
- Subsequent iterations: Accumulate strings, parse when complete
Returns:
Tuple of:
- sections: Extracted sections
- wasJsonComplete: True if JSON is complete
- parsedResult: Parsed JSON object
- updatedAccumulationState: Updated accumulation state (None if not in accumulation mode)
"""
if allSections is None:
allSections = []
if iteration == 1:
# First iteration - check if complete
parsed = None
try:
extracted = extractJsonString(result)
parsed = json.loads(extracted)
# Check completeness
if JsonResponseHandler.isJsonComplete(parsed):
# Complete JSON - no accumulation needed
sections = extractSectionsFromDocument(parsed)
logger.info(f"Iteration 1: Complete JSON detected, no accumulation needed")
return sections, True, parsed, None # No accumulation
except Exception:
pass
# Incomplete - try to extract partial sections from broken JSON
logger.info(f"Iteration 1: Incomplete JSON detected, attempting to extract partial sections")
partialSections = []
if parsed:
# Try to extract sections from parsed (even if incomplete)
partialSections = extractSectionsFromDocument(parsed)
else:
# Try to repair broken JSON and extract sections
try:
repaired = repairBrokenJson(result)
if repaired:
partialSections = extractSectionsFromDocument(repaired)
parsed = repaired # Use repaired version for accumulation state
except Exception:
pass # If repair fails, continue with empty sections
# Define KPIs (async call - need to handle this)
# For now, create accumulation state without KPIs, will be updated after async call
accumulationState = JsonAccumulationState(
accumulatedJsonString=result,
isAccumulationMode=True,
lastParsedResult=parsed,
allSections=partialSections,
kpis=[]
)
# Note: KPI definition will be done in the caller (async context)
return partialSections, False, parsed, accumulationState
else:
# Subsequent iterations - accumulate
if accumulationState and accumulationState.isAccumulationMode:
accumulated, sections, isComplete, parsedResult = \
JsonResponseHandler.accumulateAndParseJsonFragments(
accumulationState.accumulatedJsonString,
result,
allSections,
iteration
)
# Update accumulation state
accumulationState.accumulatedJsonString = accumulated
accumulationState.lastParsedResult = parsedResult
accumulationState.allSections = allSections + sections if sections else allSections
accumulationState.isAccumulationMode = not isComplete
# Log accumulated JSON for debugging
if parsedResult:
accumulated_json_str = json.dumps(parsedResult, indent=2, ensure_ascii=False)
self.services.utils.writeDebugFile(accumulated_json_str, f"{debugPrefix}_accumulated_json_iteration_{iteration}.json")
return sections, isComplete, parsedResult, accumulationState
else:
# No accumulation mode - process normally (shouldn't happen)
logger.warning(f"Iteration {iteration}: No accumulation state but iteration > 1")
return [], False, None, None
def _shouldContinueGeneration(
self,
allSections: List[Dict[str, Any]],
iteration: int,
wasJsonComplete: bool,
rawResponse: str = None
) -> bool:
"""
Determine if AI generation loop should continue.
CRITICAL: This is ONLY about AI Loop Completion, NOT Action DoD!
Action DoD is checked AFTER the AI Loop completes in _refineDecide.
Simple logic:
- If JSON parsing failed or incomplete → continue (needs more content)
- If JSON parses successfully and is complete → stop (all content delivered)
- Loop detection prevents infinite loops
CRITICAL: JSON completeness is determined by parsing, NOT by last character check!
Returns True if we should continue, False if AI Loop is done.
"""
if len(allSections) == 0:
return True # No sections yet, continue
# CRITERION 1: If JSON was incomplete/broken (parsing failed or incomplete) - continue to repair/complete
if not wasJsonComplete:
logger.info(f"Iteration {iteration}: JSON incomplete/broken - continuing to complete")
return True
# CRITERION 2: JSON is complete (parsed successfully) - check for loop detection
if self._isStuckInLoop(allSections, iteration):
logger.warning(f"Iteration {iteration}: Detected potential infinite loop - stopping AI loop")
return False
# JSON is complete and not stuck in loop - done
logger.info(f"Iteration {iteration}: JSON complete - AI loop done")
return False
def _isStuckInLoop(
self,
allSections: List[Dict[str, Any]],
iteration: int
) -> bool:
"""
Detect if we're stuck in a loop (same content being repeated).
Generic approach: Check if recent iterations are adding minimal or duplicate content.
"""
if iteration < 3:
return False # Need at least 3 iterations to detect a loop
if len(allSections) == 0:
return False
# Check if last section is very small (might be stuck)
lastSection = allSections[-1]
elements = lastSection.get("elements", [])
if isinstance(elements, list) and elements:
lastElem = elements[-1] if elements else {}
else:
lastElem = elements if isinstance(elements, dict) else {}
# Check content size of last section
lastSectionSize = 0
if isinstance(lastElem, dict):
for key, value in lastElem.items():
if isinstance(value, str):
lastSectionSize += len(value)
elif isinstance(value, list):
lastSectionSize += len(str(value))
# If last section is very small and we've done many iterations, might be stuck
if lastSectionSize < 100 and iteration > 10:
logger.warning(f"Potential loop detected: iteration {iteration}, last section size {lastSectionSize}")
return True
return False
def _extractDocumentMetadata(
self,
parsedResult: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""
Extract document metadata (title, filename) from parsed AI response.
Returns dict with 'title' and 'filename' keys if found, None otherwise.
"""
if not isinstance(parsedResult, dict):
return None
# Try to get from documents array (preferred structure)
if "documents" in parsedResult and isinstance(parsedResult["documents"], list) and len(parsedResult["documents"]) > 0:
firstDoc = parsedResult["documents"][0]
if isinstance(firstDoc, dict):
title = firstDoc.get("title")
filename = firstDoc.get("filename")
if title or filename:
return {
"title": title,
"filename": filename
}
return None
def _buildFinalResultFromSections(
self,
allSections: List[Dict[str, Any]],
documentMetadata: Optional[Dict[str, Any]] = None
) -> str:
"""
Build final JSON result from accumulated sections.
Uses AI-provided metadata (title, filename) if available.
"""
if not allSections:
return ""
# Extract metadata from AI response if available
title = "Generated Document"
filename = "document.json"
if documentMetadata:
if documentMetadata.get("title"):
title = documentMetadata["title"]
if documentMetadata.get("filename"):
filename = documentMetadata["filename"]
# Build documents structure
# Assuming single document for now
documents = [{
"id": "doc_1",
"title": title,
"filename": filename,
"sections": allSections
}]
result = {
"metadata": {
"split_strategy": "single_document",
"source_documents": [],
"extraction_method": "ai_generation"
},
"documents": documents
}
return json.dumps(result, indent=2)
# Public API Methods
# Planning AI Call
async def callAiPlanning(
self,
prompt: str,
placeholders: Optional[List[PromptPlaceholder]] = None,
debugType: Optional[str] = None
) -> str:
"""
Planning AI call for task planning, action planning, action selection, etc.
Always uses static parameters optimized for planning tasks.
Args:
prompt: The planning prompt
placeholders: Optional list of placeholder replacements
debugType: Optional debug file type identifier (e.g., 'taskplan', 'dynamic', 'intentanalysis')
If not provided, defaults to 'plan'
Returns:
Planning JSON response
"""
await self.ensureAiObjectsInitialized()
# Planning calls always use static parameters
options = AiCallOptions(
operationType=OperationTypeEnum.PLAN,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
compressPrompt=False,
compressContext=False
)
# Build full prompt with placeholders
if placeholders:
placeholdersDict = {p.label: p.content for p in placeholders}
fullPrompt = self._buildPromptWithPlaceholders(prompt, placeholdersDict)
else:
fullPrompt = prompt
# Root-cause fix: planning must return raw single-shot JSON, not section-based output
request = AiCallRequest(
prompt=fullPrompt,
context="",
options=options
)
# Debug: persist prompt/response for analysis with context-specific naming
debugPrefix = debugType if debugType else "plan"
self.services.utils.writeDebugFile(fullPrompt, f"{debugPrefix}_prompt")
response = await self.aiObjects.callWithTextContext(request)
result = response.content or ""
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
return result
async def callAiContent(
self,
prompt: str,
options: AiCallOptions,
contentParts: Optional[List[ContentPart]] = None,
outputFormat: Optional[str] = None,
title: Optional[str] = None,
parentOperationId: Optional[str] = None # Parent operation ID for hierarchical logging
) -> AiResponse:
"""
Unified AI content processing method (replaces callAiDocuments and callAiText).
Args:
prompt: The main prompt for the AI call
contentParts: Optional list of already-extracted content parts (preferred)
options: AI call configuration options (REQUIRED - operationType must be set)
outputFormat: Optional output format for document generation (e.g., 'pdf', 'docx', 'xlsx')
title: Optional title for generated documents
parentOperationId: Optional parent operation ID for hierarchical logging
Returns:
AiResponse with content, metadata, and optional documents
"""
await self.ensureAiObjectsInitialized()
# Create separate operationId for detailed progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
aiOperationId = f"ai_content_{workflowId}_{int(time.time())}"
# Use parent operation ID directly (parentId should be operationId, not log entry ID)
# parentOperationId is already the operationId of the parent
# Start progress tracking with parent reference
self.services.chat.progressLogStart(
aiOperationId,
"AI content processing",
"Content Processing",
f"Format: {outputFormat or 'text'}",
parentOperationId=parentOperationId
)
try:
# Default outputFormat to "txt" if not specified (unified path - all formats handled the same way)
if not outputFormat:
outputFormat = "txt"
# Extraction is now separate - contentParts must be extracted before calling
# Require operationType to be set before calling
opType = getattr(options, "operationType", None)
if not opType:
# outputFormat is always set now (defaults to "txt"), so default to DATA_GENERATE
options.operationType = OperationTypeEnum.DATA_GENERATE
opType = OperationTypeEnum.DATA_GENERATE
# Handle IMAGE_GENERATE operations
if opType == OperationTypeEnum.IMAGE_GENERATE:
self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for image generation")
request = AiCallRequest(
prompt=prompt,
context="",
options=options
)
response = await self.callAi(request)
if response.content:
# Build document data for image
imageDoc = DocumentData(
documentName="generated_image.png",
documentData=response.content,
mimeType="image/png"
)
metadata = AiResponseMetadata(
title=title or "Generated Image",
operationType=opType.value
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
"ai.generate.image"
)
self.services.chat.progressLogUpdate(aiOperationId, 0.9, "Image generated")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=response.content,
metadata=metadata,
documents=[imageDoc]
)
else:
errorMsg = f"No image data returned: {response.content}"
logger.error(f"Error in AI image generation: {errorMsg}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(errorMsg)
# Handle WEB_SEARCH and WEB_CRAWL operations
if opType == OperationTypeEnum.WEB_SEARCH or opType == OperationTypeEnum.WEB_CRAWL:
self.services.chat.progressLogUpdate(aiOperationId, 0.4, f"Calling AI for {opType.name}")
request = AiCallRequest(
prompt=prompt, # Raw JSON prompt - connector will parse it
context="",
options=options
)
response = await self.callAi(request)
if response.content:
metadata = AiResponseMetadata(
operationType=opType.value
)
self.services.chat.storeWorkflowStat(
self.services.workflow,
response,
f"ai.{opType.name.lower()}"
)
self.services.chat.progressLogUpdate(aiOperationId, 0.9, f"{opType.name} completed")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=response.content,
metadata=metadata
)
else:
errorMsg = f"No content returned from {opType.name}: {response.content}"
logger.error(f"Error in {opType.name}: {errorMsg}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(errorMsg)
# Handle document generation (outputFormat always set, defaults to "txt")
# Unified path: all formats (txt, docx, xlsx, pdf, etc.) handled the same way
# outputFormat is always set now (defaults to "txt" if not specified)
# CRITICAL: For document generation with JSON templates, NEVER compress the prompt
options.compressPrompt = False
options.compressContext = False
# Process contentParts for generation prompt (if provided)
# Use generic callWithContentParts() which handles all content types (images, text, etc.)
# This automatically processes images with vision models and merges all results
if contentParts:
# Filter out binary/other parts that shouldn't be processed
processableParts = []
skippedParts = []
for p in contentParts:
if p.typeGroup in ["image", "text", "table", "structure"] or (p.mimeType and (p.mimeType.startswith("image/") or p.mimeType.startswith("text/"))):
processableParts.append(p)
else:
skippedParts.append(p)
if skippedParts:
logger.debug(f"Skipping {len(skippedParts)} binary/other parts from document generation")
if processableParts:
# Count images for progress update
imageCount = len([p for p in processableParts if p.typeGroup == "image" or (p.mimeType and p.mimeType.startswith("image/"))])
if imageCount > 0:
self.services.chat.progressLogUpdate(aiOperationId, 0.25, f"Extracting data from {imageCount} images using vision models")
# Build proper extraction prompt using buildExtractionPrompt
# This creates a focused extraction prompt, not the user's generation prompt
from modules.services.serviceExtraction.subPromptBuilderExtraction import buildExtractionPrompt
# Determine renderer for format-specific guidelines
renderer = None
if outputFormat:
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
renderer = generationService.getRendererForFormat(outputFormat)
except Exception as e:
logger.debug(f"Could not get renderer for format {outputFormat}: {e}")
extractionPrompt = await buildExtractionPrompt(
outputFormat=outputFormat or "txt",
userPrompt=prompt, # User's prompt as context for what to extract
title=title or "Document",
aiService=self if hasattr(self, 'aiObjects') and self.aiObjects else None,
services=self.services,
renderer=renderer
)
logger.info(f"Processing {len(processableParts)} content parts ({imageCount} images) with extraction prompt")
# Use DATA_EXTRACT operation type for extraction
extractionOptions = AiCallOptions(
operationType=OperationTypeEnum.DATA_EXTRACT, # Use DATA_EXTRACT for extraction
compressPrompt=options.compressPrompt,
compressContext=options.compressContext
)
extractionRequest = AiCallRequest(
prompt=extractionPrompt, # Use proper extraction prompt, not user's generation prompt
context="",
options=extractionOptions,
contentParts=processableParts
)
# Write debug file for extraction prompt (all parts)
self.services.utils.writeDebugFile(extractionPrompt, "content_extraction_prompt")
# Call generic content parts processor - handles images, text, chunking, merging
extractionResponse = await self.callAi(extractionRequest)
# Write debug file for extraction response
if extractionResponse.content:
self.services.utils.writeDebugFile(extractionResponse.content, "content_extraction_response")
else:
self.services.utils.writeDebugFile(f"Error: No content returned (errorCount={extractionResponse.errorCount})", "content_extraction_response")
logger.warning(f"Content extraction returned no content (errorCount={extractionResponse.errorCount})")
# Use extracted content directly for generation prompt
if extractionResponse.errorCount == 0 and extractionResponse.content:
# The extracted content is already merged and ready to use
content_for_generation = extractionResponse.content
logger.info(f"Successfully extracted content from {len(processableParts)} parts ({len(extractionResponse.content)} chars) for document generation")
else:
# Extraction failed - use placeholders
logger.warning(f"Content extraction failed, using placeholders")
placeholderParts = []
for p in processableParts:
placeholderParts.append(f"[{p.typeGroup}: {p.label} - Extraction failed]")
content_for_generation = "\n\n".join(placeholderParts) if placeholderParts else None
else:
content_for_generation = None
logger.debug("No processable parts found in contentParts")
else:
content_for_generation = None
self.services.chat.progressLogUpdate(aiOperationId, 0.3, "Building generation prompt")
from modules.services.serviceGeneration.subPromptBuilderGeneration import buildGenerationPrompt
generation_prompt = await buildGenerationPrompt(
outputFormat, prompt, title, content_for_generation, None, self.services
)
promptArgs = {
"outputFormat": outputFormat,
"userPrompt": prompt,
"title": title,
"extracted_content": content_for_generation,
"services": self.services
}
self.services.chat.progressLogUpdate(aiOperationId, 0.4, "Calling AI for content generation")
# Extract user prompt from promptArgs for task completion analysis
userPrompt = None
if promptArgs:
userPrompt = promptArgs.get("userPrompt") or promptArgs.get("user_prompt")
# Track generation progress - the looping function will update with byte progress
generated_json = await self._callAiWithLooping(
generation_prompt,
options,
"document_generation",
buildGenerationPrompt,
promptArgs,
aiOperationId,
userPrompt=userPrompt
)
# Calculate final size for completion message
finalSize = len(generated_json.encode('utf-8')) if generated_json else 0
if finalSize < 1024:
finalSizeDisplay = f"{finalSize}B"
elif finalSize < 1024 * 1024:
finalSizeDisplay = f"{finalSize / 1024:.1f}kB"
else:
finalSizeDisplay = f"{finalSize / (1024 * 1024):.1f}MB"
self.services.chat.progressLogUpdate(aiOperationId, 0.7, f"Parsing generated JSON ({finalSizeDisplay})")
try:
extracted_json = self.services.utils.jsonExtractString(generated_json)
generated_data = json.loads(extracted_json)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse generated JSON: {str(e)}")
self.services.utils.writeDebugFile(generated_json, "failed_json_parsing")
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(f"Generated content is not valid JSON: {str(e)}")
# Extract title and filename from generated document structure
extractedTitle = title
extractedFilename = None
if isinstance(generated_data, dict) and "documents" in generated_data:
docs = generated_data["documents"]
if isinstance(docs, list) and len(docs) > 0:
firstDoc = docs[0]
if isinstance(firstDoc, dict):
if firstDoc.get("title"):
extractedTitle = firstDoc["title"]
if firstDoc.get("filename"):
extractedFilename = firstDoc["filename"]
# Ensure metadata contains the extracted title
if "metadata" not in generated_data:
generated_data["metadata"] = {}
if extractedTitle:
generated_data["metadata"]["title"] = extractedTitle
# Create separate operation for content rendering
renderOperationId = f"{aiOperationId}_render"
# Use aiOperationId directly as parentOperationId (operationId, not log entry ID)
self.services.chat.progressLogStart(
renderOperationId,
"Content Rendering",
"Rendering",
f"Format: {outputFormat}",
parentOperationId=aiOperationId
)
try:
from modules.services.serviceGeneration.mainServiceGeneration import GenerationService
generationService = GenerationService(self.services)
self.services.chat.progressLogUpdate(renderOperationId, 0.5, f"Rendering to {outputFormat} format")
rendered_content, mime_type = await generationService.renderReport(
generated_data, outputFormat, extractedTitle or "Generated Document", prompt, self
)
self.services.chat.progressLogFinish(renderOperationId, True)
# Determine document name
if extractedFilename:
documentName = extractedFilename
elif extractedTitle and extractedTitle != "Generated Document":
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", extractedTitle)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
if sanitized:
if not sanitized.lower().endswith(f".{outputFormat}"):
documentName = f"{sanitized}.{outputFormat}"
else:
documentName = sanitized
else:
documentName = f"generated.{outputFormat}"
else:
documentName = f"generated.{outputFormat}"
# Build document data
docData = DocumentData(
documentName=documentName,
documentData=rendered_content,
mimeType=mime_type,
sourceJson=generated_data # Preserve source JSON for structure validation
)
metadata = AiResponseMetadata(
title=extractedTitle or title or "Generated Document",
filename=extractedFilename,
operationType=opType.value if opType else None
)
# Write JSON with proper formatting (not str() which can truncate)
jsonStr = json.dumps(generated_data, indent=2, ensure_ascii=False)
self.services.utils.writeDebugFile(jsonStr, "document_generation_response")
self.services.chat.progressLogFinish(aiOperationId, True)
return AiResponse(
content=json.dumps(generated_data),
metadata=metadata,
documents=[docData]
)
except Exception as e:
logger.error(f"Error rendering document: {str(e)}")
if renderOperationId:
self.services.chat.progressLogFinish(renderOperationId, False)
self.services.chat.progressLogFinish(aiOperationId, False)
raise ValueError(f"Rendering failed: {str(e)}")
except Exception as e:
logger.error(f"Error in callAiContent: {str(e)}")
self.services.chat.progressLogFinish(aiOperationId, False)
raise