# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ AI Call Looping Module Handles AI calls with looping and repair logic, including: - Looping with JSON repair and continuation - KPI definition and tracking - Progress tracking and iteration management """ import json import logging from typing import Dict, Any, List, Optional, Callable from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState from modules.datamodels.datamodelExtraction import ContentPart from modules.shared.jsonUtils import buildContinuationContext, extractJsonString from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler logger = logging.getLogger(__name__) class AiCallLooper: """Handles AI calls with looping and repair logic.""" def __init__(self, services, aiService, responseParser): """Initialize AiCallLooper with service center, AI service, and response parser access.""" self.services = services self.aiService = aiService self.responseParser = responseParser async def callAiWithLooping( self, prompt: str, options: AiCallOptions, debugPrefix: str = "ai_call", promptBuilder: Optional[Callable] = None, promptArgs: Optional[Dict[str, Any]] = None, operationId: Optional[str] = None, userPrompt: Optional[str] = None, contentParts: Optional[List[ContentPart]] = None # ARCHITECTURE: Support ContentParts for large content ) -> str: """ Shared core function for AI calls with repair-based looping system. Automatically repairs broken JSON and continues generation seamlessly. Args: prompt: The prompt to send to AI options: AI call configuration options debugPrefix: Prefix for debug file names promptBuilder: Optional function to rebuild prompts for continuation promptArgs: Optional arguments for prompt builder operationId: Optional operation ID for progress tracking userPrompt: Optional user prompt for KPI definition contentParts: Optional content parts for first iteration Returns: Complete AI response after all iterations """ maxIterations = 50 # Prevent infinite loops iteration = 0 allSections = [] # Accumulate all sections across iterations lastRawResponse = None # Store last raw JSON response for continuation documentMetadata = None # Store document metadata (title, filename) from first iteration accumulationState = None # Track accumulation state for string accumulation # Get parent operation ID for iteration operations (parentId should be operationId, not log entry ID) parentOperationId = operationId # Use the parent's operationId directly while iteration < maxIterations: iteration += 1 # Create separate operation for each iteration with parent reference iterationOperationId = None if operationId: iterationOperationId = f"{operationId}_iter_{iteration}" self.services.chat.progressLogStart( iterationOperationId, "AI Call", f"Iteration {iteration}", "", parentOperationId=parentOperationId ) # Build iteration prompt # CRITICAL: Build continuation prompt if we have sections OR if we have a previous response (even if broken) # This ensures continuation prompts are built even when JSON is so broken that no sections can be extracted if (len(allSections) > 0 or lastRawResponse) and promptBuilder and promptArgs: # This is a continuation - build continuation context with raw JSON and rebuild prompt continuationContext = buildContinuationContext(allSections, lastRawResponse) if not lastRawResponse: logger.warning(f"Iteration {iteration}: No previous response available for continuation!") # Filter promptArgs to only include parameters that buildGenerationPrompt accepts # buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext, services filteredPromptArgs = { k: v for k, v in promptArgs.items() if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content', 'services'] } # Always include services if available if not filteredPromptArgs.get('services') and hasattr(self, 'services'): filteredPromptArgs['services'] = self.services # Rebuild prompt with continuation context using the provided prompt builder iterationPrompt = await promptBuilder(**filteredPromptArgs, continuationContext=continuationContext) else: # First iteration - use original prompt iterationPrompt = prompt # Make AI call try: if iterationOperationId: self.services.chat.progressLogUpdate(iterationOperationId, 0.3, "Calling AI model") # ARCHITECTURE: Pass ContentParts directly to AiCallRequest # This allows model-aware chunking to handle large content properly # ContentParts are only passed in first iteration (continuations don't need them) request = AiCallRequest( prompt=iterationPrompt, context="", options=options, contentParts=contentParts if iteration == 1 else None # Only pass ContentParts in first iteration ) # Write the ACTUAL prompt sent to AI if iteration == 1: self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt") else: self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") response = await self.aiService.callAi(request) result = response.content # Track bytes for progress reporting bytesReceived = len(result.encode('utf-8')) if result else 0 totalBytesSoFar = sum(len(section.get('content', '').encode('utf-8')) if isinstance(section.get('content'), str) else 0 for section in allSections) + bytesReceived # Update progress after AI call with byte information if iterationOperationId: # Format bytes for display (kB or MB) if totalBytesSoFar < 1024: bytesDisplay = f"{totalBytesSoFar}B" elif totalBytesSoFar < 1024 * 1024: bytesDisplay = f"{totalBytesSoFar / 1024:.1f}kB" else: bytesDisplay = f"{totalBytesSoFar / (1024 * 1024):.1f}MB" self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})") # Write raw AI response to debug file if iteration == 1: self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") else: self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") # Emit stats for this iteration (only if workflow exists and has id) if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id: try: self.services.chat.storeWorkflowStat( self.services.workflow, response, f"ai.call.{debugPrefix}.iteration_{iteration}" ) except Exception as statError: # Don't break the main loop if stat storage fails logger.warning(f"Failed to store workflow stat: {str(statError)}") # Check for error response using generic error detection (errorCount > 0 or modelName == "error") if hasattr(response, 'errorCount') and response.errorCount > 0: errorMsg = f"Iteration {iteration}: Error response detected (errorCount={response.errorCount}), stopping loop: {result[:200] if result else 'empty'}" logger.error(errorMsg) break if hasattr(response, 'modelName') and response.modelName == "error": errorMsg = f"Iteration {iteration}: Error response detected (modelName=error), stopping loop: {result[:200] if result else 'empty'}" logger.error(errorMsg) break if not result or not result.strip(): logger.warning(f"Iteration {iteration}: Empty response, stopping") break # Check if this is a text response (not document generation) # Text responses don't need JSON parsing - return immediately after first successful response isTextResponse = (promptBuilder is None and promptArgs is None) or debugPrefix == "text" if isTextResponse: # For text responses, return the text immediately - no JSON parsing needed logger.info(f"Iteration {iteration}: Text response received, returning immediately") if iterationOperationId: self.services.chat.progressLogFinish(iterationOperationId, True) return result # Store raw response for continuation (even if broken) lastRawResponse = result # Extract sections from response (handles both valid and broken JSON) # Only for document generation (JSON responses) # CRITICAL: Pass allSections and accumulationState to enable string accumulation extractedSections, wasJsonComplete, parsedResult, accumulationState = self.responseParser.extractSectionsFromResponse( result, iteration, debugPrefix, allSections, accumulationState ) # CRITICAL: Merge sections BEFORE KPI validation # This ensures sections are preserved even if KPI validation fails if extractedSections: allSections = JsonResponseHandler.mergeSectionsIntelligently(allSections, extractedSections, iteration) # Define KPIs if we just entered accumulation mode (iteration 1, incomplete JSON) if accumulationState and accumulationState.isAccumulationMode and iteration == 1 and not accumulationState.kpis: logger.info(f"Iteration {iteration}: Defining KPIs for accumulation tracking") continuationContext = buildContinuationContext(allSections, result) # Pass raw response string from first iteration for KPI definition kpiDefinitions = await self._defineKpisFromPrompt( userPrompt or prompt, result, # Pass raw JSON string from first iteration continuationContext, debugPrefix ) # Initialize KPIs with currentValue = 0 accumulationState.kpis = [{**kpi, "currentValue": 0} for kpi in kpiDefinitions] logger.info(f"Defined {len(accumulationState.kpis)} KPIs: {[kpi.get('id') for kpi in accumulationState.kpis]}") # Extract and validate KPIs (if in accumulation mode with KPIs defined) if accumulationState and accumulationState.isAccumulationMode and accumulationState.kpis: # For KPI extraction, prefer accumulated JSON string over repaired JSON # because repairBrokenJson may lose data (e.g., empty rows array when JSON is incomplete) updatedKpis = [] # First try to extract from parsedResult (repaired JSON) if parsedResult: try: updatedKpis = JsonResponseHandler.extractKpiValuesFromJson( parsedResult, accumulationState.kpis ) # Check if we got meaningful values (non-zero) hasValidValues = any(kpi.get("currentValue", 0) > 0 for kpi in updatedKpis) if not hasValidValues and accumulationState.accumulatedJsonString: # Repaired JSON has empty values, try accumulated string logger.debug("Repaired JSON has empty KPI values, trying accumulated JSON string") updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson( accumulationState.accumulatedJsonString, accumulationState.kpis ) except Exception as e: logger.debug(f"Error extracting KPIs from parsedResult: {e}") updatedKpis = [] # If no parsedResult or extraction failed, try accumulated string if not updatedKpis and accumulationState.accumulatedJsonString: try: updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson( accumulationState.accumulatedJsonString, accumulationState.kpis ) except Exception as e: logger.debug(f"Error extracting KPIs from accumulated JSON string: {e}") updatedKpis = [] if updatedKpis: shouldProceed, reason = JsonResponseHandler.validateKpiProgression( accumulationState, updatedKpis ) if not shouldProceed: logger.warning(f"Iteration {iteration}: KPI validation failed: {reason}") if iterationOperationId: self.services.chat.progressLogFinish(iterationOperationId, False) if operationId: self.services.chat.progressLogUpdate(operationId, 0.9, f"KPI validation failed: {reason} ({iteration} iterations)") break # Update KPIs in accumulation state accumulationState.kpis = updatedKpis logger.info(f"Iteration {iteration}: KPIs updated: {[(kpi.get('id'), kpi.get('currentValue')) for kpi in updatedKpis]}") # Check if all KPIs completed allCompleted = True for kpi in updatedKpis: targetValue = kpi.get("targetValue", 0) currentValue = kpi.get("currentValue", 0) if currentValue < targetValue: allCompleted = False break if allCompleted: logger.info(f"Iteration {iteration}: All KPIs completed, finishing accumulation") wasJsonComplete = True # Mark as complete to exit loop # CRITICAL: Handle JSON fragments (continuation content) # Fragment merging happens inside extractSectionsFromResponse # If merge fails (returns wasJsonComplete=True), stop iterations and complete JSON if not extractedSections and allSections: if wasJsonComplete: # Merge failed - stop iterations, complete JSON with available data logger.error(f"Iteration {iteration}: ❌ MERGE FAILED - Stopping iterations, completing JSON with available data") if iterationOperationId: self.services.chat.progressLogFinish(iterationOperationId, False) if operationId: self.services.chat.progressLogUpdate(operationId, 0.9, f"Merge failed, completing JSON ({iteration} iterations)") break # Fragment was detected and merged successfully logger.info(f"Iteration {iteration}: JSON fragment detected and merged, continuing") # Don't break - fragment was merged, continue to get more content if needed # Check if we should continue based on JSON completeness shouldContinue = self.responseParser.shouldContinueGeneration( allSections, iteration, wasJsonComplete, result ) if shouldContinue: if iterationOperationId: self.services.chat.progressLogUpdate(iterationOperationId, 0.8, "Fragment merged, continuing") self.services.chat.progressLogFinish(iterationOperationId, True) continue else: # Done - fragment was merged and JSON is complete if iterationOperationId: self.services.chat.progressLogFinish(iterationOperationId, True) if operationId: self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, fragment merged)") logger.info(f"Generation complete after {iteration} iterations: fragment merged") break # Extract document metadata from first iteration if available if iteration == 1 and parsedResult and not documentMetadata: documentMetadata = self.responseParser.extractDocumentMetadata(parsedResult) # Update progress after parsing if iterationOperationId: if extractedSections: self.services.chat.progressLogUpdate(iterationOperationId, 0.8, f"Extracted {len(extractedSections)} sections") if not extractedSections: # CRITICAL: If JSON was incomplete/broken, continue even if no sections extracted # This allows the AI to retry and complete the broken JSON if not wasJsonComplete: logger.warning(f"Iteration {iteration}: No sections extracted from broken JSON, continuing for another attempt") continue # If JSON was complete but no sections extracted - check if it was a fragment # Fragments are handled above, so if we get here and it's complete, it's an error logger.warning(f"Iteration {iteration}: No sections extracted from complete JSON, stopping") break # NOTE: Section merging now happens BEFORE KPI validation (see above) # This ensures sections are preserved even if KPI validation fails # Calculate total bytes in merged content for progress display merged_json_str = json.dumps(allSections, indent=2, ensure_ascii=False) totalBytesGenerated = len(merged_json_str.encode('utf-8')) # Update main operation with byte progress if operationId: # Format bytes for display if totalBytesGenerated < 1024: bytesDisplay = f"{totalBytesGenerated}B" elif totalBytesGenerated < 1024 * 1024: bytesDisplay = f"{totalBytesGenerated / 1024:.1f}kB" else: bytesDisplay = f"{totalBytesGenerated / (1024 * 1024):.1f}MB" # Estimate progress based on iterations (rough estimate) estimatedProgress = min(0.9, 0.4 + (iteration * 0.1)) self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})") # Log merged sections for debugging self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") # Check if we should continue (completion detection) # Simple logic: JSON completeness determines continuation shouldContinue = self.responseParser.shouldContinueGeneration( allSections, iteration, wasJsonComplete, result ) if shouldContinue: # Finish iteration operation (will continue with next iteration) if iterationOperationId: # Show byte progress in iteration completion iterBytes = len(result.encode('utf-8')) if result else 0 if iterBytes < 1024: iterBytesDisplay = f"{iterBytes}B" elif iterBytes < 1024 * 1024: iterBytesDisplay = f"{iterBytes / 1024:.1f}kB" else: iterBytesDisplay = f"{iterBytes / (1024 * 1024):.1f}MB" self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Completed ({iterBytesDisplay})") self.services.chat.progressLogFinish(iterationOperationId, True) continue else: # Done - finish iteration and update main operation if iterationOperationId: # Show final byte count finalBytes = len(merged_json_str.encode('utf-8')) if finalBytes < 1024: finalBytesDisplay = f"{finalBytes}B" elif finalBytes < 1024 * 1024: finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" else: finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Complete ({finalBytesDisplay})") self.services.chat.progressLogFinish(iterationOperationId, True) if operationId: # Show final size in main operation finalBytes = len(merged_json_str.encode('utf-8')) if finalBytes < 1024: finalBytesDisplay = f"{finalBytes}B" elif finalBytes < 1024 * 1024: finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" else: finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete: {finalBytesDisplay} ({iteration} iterations, {len(allSections)} sections)") logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections") break except Exception as e: logger.error(f"Error in AI call iteration {iteration}: {str(e)}") if iterationOperationId: self.services.chat.progressLogFinish(iterationOperationId, False) break if iteration >= maxIterations: logger.warning(f"AI call stopped after maximum iterations ({maxIterations})") # CRITICAL: Complete any incomplete structures in sections before building final result # This ensures JSON is properly closed even if merge failed or iterations stopped early allSections = JsonResponseHandler.completeIncompleteStructures(allSections) # Build final result from accumulated sections final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata) # Write final result to debug file self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") return final_result async def _defineKpisFromPrompt( self, userPrompt: str, rawJsonString: Optional[str], continuationContext: Dict[str, Any], debugPrefix: str = "kpi" ) -> List[Dict[str, Any]]: """ Make separate AI call to define KPIs based on user prompt and incomplete JSON. Args: userPrompt: Original user prompt rawJsonString: Raw JSON string from first iteration response continuationContext: Continuation context (not used for JSON, kept for compatibility) debugPrefix: Prefix for debug file names Returns: List of KPI definitions: [{"id": str, "description": str, "jsonPath": str, "targetValue": int}, ...] """ # Use raw JSON string from first iteration response if rawJsonString: # Remove markdown code fences if present from modules.shared.jsonUtils import stripCodeFences incompleteJson = stripCodeFences(rawJsonString.strip()) else: incompleteJson = "Not available" kpiDefinitionPrompt = f"""Analyze the user request and incomplete JSON to define KPIs (Key Performance Indicators) for tracking progress. User Request: {userPrompt} Delivered JSON part: {incompleteJson} Task: Define which JSON items should be tracked to measure completion progress. IMPORTANT: Analyze the Delivered JSON part structure to understand what is being tracked: 1. Identify the structure type (table with rows, list with items, etc.) 2. Determine what the jsonPath actually counts (number of rows, number of items, etc.) 3. Calculate targetValue based on what is being tracked, NOT the total quantity requested For each trackable item, provide: - id: Unique identifier (use descriptive name) - description: What this KPI measures (be specific about what is counted) - jsonPath: Path to extract value from JSON (use dot notation with array indices, e.g., "documents[0].sections[1].elements[0].rows") - targetValue: Target value to reach (integer) - MUST match what jsonPath actually tracks (rows count, items count, etc.) Return ONLY valid JSON in this format: {{ "kpis": [ {{ "id": "unique_id", "description": "Description of what is measured", "jsonPath": "path.to.value", "targetValue": 0 }} ] }} If no trackable items can be identified, return: {{"kpis": []}} """ try: request = AiCallRequest( prompt=kpiDefinitionPrompt, options=AiCallOptions( operationType=OperationTypeEnum.DATA_ANALYSE, priority=PriorityEnum.SPEED, processingMode=ProcessingModeEnum.BASIC ) ) # Write KPI definition prompt to debug file self.services.utils.writeDebugFile(kpiDefinitionPrompt, f"{debugPrefix}_kpi_definition_prompt") response = await self.aiService.callAi(request) # Write KPI definition response to debug file self.services.utils.writeDebugFile(response.content, f"{debugPrefix}_kpi_definition_response") # Parse response extracted = extractJsonString(response.content) kpiResponse = json.loads(extracted) kpiDefinitions = kpiResponse.get("kpis", []) logger.info(f"Defined {len(kpiDefinitions)} KPIs for tracking") return kpiDefinitions except Exception as e: logger.warning(f"Failed to define KPIs: {e}, continuing without KPI tracking") return []