From 9d4bd8ceef948b3891eb643b46caca706a116b6a Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Fri, 26 Dec 2025 00:16:08 +0100 Subject: [PATCH] refactored ai service container (3000 lines) with submodules, and enhanced generation part with dynamic chapters --- modules/services/serviceAi/mainServiceAi.py | 2377 +---------------- .../services/serviceAi/subAiCallLooping.py | 533 ++++ 2 files changed, 548 insertions(+), 2362 deletions(-) create mode 100644 modules/services/serviceAi/subAiCallLooping.py diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index f8ab4dad..777e6230 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -57,6 +57,7 @@ class AiService: from modules.services.serviceAi.subContentExtraction import ContentExtractor from modules.services.serviceAi.subStructureGeneration import StructureGenerator from modules.services.serviceAi.subStructureFilling import StructureFiller + from modules.services.serviceAi.subAiCallLooping import AiCallLooper if not hasattr(self, 'responseParser'): logger.info("Initializing ResponseParser...") @@ -77,6 +78,10 @@ class AiService: if not hasattr(self, 'structureFiller'): logger.info("Initializing StructureFiller...") self.structureFiller = StructureFiller(self.services, self) + + if not hasattr(self, 'aiCallLooper'): + logger.info("Initializing AiCallLooper...") + self.aiCallLooper = AiCallLooper(self.services, self, self.responseParser) async def callAi(self, request: AiCallRequest, progressCallback=None): """Router: handles content parts via extractionService, text context via interface. @@ -214,402 +219,10 @@ Respond with ONLY a JSON object in this exact format: userPrompt: Optional[str] = None, contentParts: Optional[List[ContentPart]] = None # ARCHITECTURE: Support ContentParts for large content ) -> str: - """ - Shared core function for AI calls with repair-based looping system. - Automatically repairs broken JSON and continues generation seamlessly. - - Args: - prompt: The prompt to send to AI - options: AI call configuration options - debugPrefix: Prefix for debug file names - promptBuilder: Optional function to rebuild prompts for continuation - promptArgs: Optional arguments for prompt builder - operationId: Optional operation ID for progress tracking - - Returns: - Complete AI response after all iterations - """ - maxIterations = 50 # Prevent infinite loops - iteration = 0 - allSections = [] # Accumulate all sections across iterations - lastRawResponse = None # Store last raw JSON response for continuation - documentMetadata = None # Store document metadata (title, filename) from first iteration - accumulationState = None # Track accumulation state for string accumulation - - # Get parent operation ID for iteration operations (parentId should be operationId, not log entry ID) - parentOperationId = operationId # Use the parent's operationId directly - - while iteration < maxIterations: - iteration += 1 - - # Create separate operation for each iteration with parent reference - iterationOperationId = None - if operationId: - iterationOperationId = f"{operationId}_iter_{iteration}" - self.services.chat.progressLogStart( - iterationOperationId, - "AI Call", - f"Iteration {iteration}", - "", - parentOperationId=parentOperationId - ) - - # Build iteration prompt - # CRITICAL: Build continuation prompt if we have sections OR if we have a previous response (even if broken) - # This ensures continuation prompts are built even when JSON is so broken that no sections can be extracted - if (len(allSections) > 0 or lastRawResponse) and promptBuilder and promptArgs: - # This is a continuation - build continuation context with raw JSON and rebuild prompt - continuationContext = buildContinuationContext(allSections, lastRawResponse) - if not lastRawResponse: - logger.warning(f"Iteration {iteration}: No previous response available for continuation!") - - # Filter promptArgs to only include parameters that buildGenerationPrompt accepts - # buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext, services - filteredPromptArgs = { - k: v for k, v in promptArgs.items() - if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content', 'services'] - } - # Always include services if available - if not filteredPromptArgs.get('services') and hasattr(self, 'services'): - filteredPromptArgs['services'] = self.services - - # Rebuild prompt with continuation context using the provided prompt builder - iterationPrompt = await promptBuilder(**filteredPromptArgs, continuationContext=continuationContext) - else: - # First iteration - use original prompt - iterationPrompt = prompt - - # Make AI call - try: - if iterationOperationId: - self.services.chat.progressLogUpdate(iterationOperationId, 0.3, "Calling AI model") - # ARCHITECTURE: Pass ContentParts directly to AiCallRequest - # This allows model-aware chunking to handle large content properly - # ContentParts are only passed in first iteration (continuations don't need them) - request = AiCallRequest( - prompt=iterationPrompt, - context="", - options=options, - contentParts=contentParts if iteration == 1 else None # Only pass ContentParts in first iteration - ) - - # Write the ACTUAL prompt sent to AI - if iteration == 1: - self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt") - else: - self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") - - response = await self.callAi(request) - result = response.content - - # Track bytes for progress reporting - bytesReceived = len(result.encode('utf-8')) if result else 0 - totalBytesSoFar = sum(len(section.get('content', '').encode('utf-8')) if isinstance(section.get('content'), str) else 0 for section in allSections) + bytesReceived - - # Update progress after AI call with byte information - if iterationOperationId: - # Format bytes for display (kB or MB) - if totalBytesSoFar < 1024: - bytesDisplay = f"{totalBytesSoFar}B" - elif totalBytesSoFar < 1024 * 1024: - bytesDisplay = f"{totalBytesSoFar / 1024:.1f}kB" - else: - bytesDisplay = f"{totalBytesSoFar / (1024 * 1024):.1f}MB" - self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})") - - # Write raw AI response to debug file - if iteration == 1: - self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") - else: - self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") - - # Emit stats for this iteration (only if workflow exists and has id) - if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id: - try: - self.services.chat.storeWorkflowStat( - self.services.workflow, - response, - f"ai.call.{debugPrefix}.iteration_{iteration}" - ) - except Exception as statError: - # Don't break the main loop if stat storage fails - logger.warning(f"Failed to store workflow stat: {str(statError)}") - - # Check for error response using generic error detection (errorCount > 0 or modelName == "error") - if hasattr(response, 'errorCount') and response.errorCount > 0: - errorMsg = f"Iteration {iteration}: Error response detected (errorCount={response.errorCount}), stopping loop: {result[:200] if result else 'empty'}" - logger.error(errorMsg) - break - - if hasattr(response, 'modelName') and response.modelName == "error": - errorMsg = f"Iteration {iteration}: Error response detected (modelName=error), stopping loop: {result[:200] if result else 'empty'}" - logger.error(errorMsg) - break - - if not result or not result.strip(): - logger.warning(f"Iteration {iteration}: Empty response, stopping") - break - - # Check if this is a text response (not document generation) - # Text responses don't need JSON parsing - return immediately after first successful response - isTextResponse = (promptBuilder is None and promptArgs is None) or debugPrefix == "text" - - if isTextResponse: - # For text responses, return the text immediately - no JSON parsing needed - logger.info(f"Iteration {iteration}: Text response received, returning immediately") - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, True) - return result - - # Store raw response for continuation (even if broken) - lastRawResponse = result - - # Extract sections from response (handles both valid and broken JSON) - # Only for document generation (JSON responses) - # CRITICAL: Pass allSections and accumulationState to enable string accumulation - extractedSections, wasJsonComplete, parsedResult, accumulationState = self._extractSectionsFromResponse( - result, iteration, debugPrefix, allSections, accumulationState - ) - - # CRITICAL: Merge sections BEFORE KPI validation - # This ensures sections are preserved even if KPI validation fails - if extractedSections: - allSections = JsonResponseHandler.mergeSectionsIntelligently(allSections, extractedSections, iteration) - - # Define KPIs if we just entered accumulation mode (iteration 1, incomplete JSON) - if accumulationState and accumulationState.isAccumulationMode and iteration == 1 and not accumulationState.kpis: - logger.info(f"Iteration {iteration}: Defining KPIs for accumulation tracking") - continuationContext = buildContinuationContext(allSections, result) - # Pass raw response string from first iteration for KPI definition - kpiDefinitions = await self._defineKpisFromPrompt( - userPrompt or prompt, - result, # Pass raw JSON string from first iteration - continuationContext, - debugPrefix - ) - # Initialize KPIs with currentValue = 0 - accumulationState.kpis = [{**kpi, "currentValue": 0} for kpi in kpiDefinitions] - logger.info(f"Defined {len(accumulationState.kpis)} KPIs: {[kpi.get('id') for kpi in accumulationState.kpis]}") - - # Extract and validate KPIs (if in accumulation mode with KPIs defined) - if accumulationState and accumulationState.isAccumulationMode and accumulationState.kpis: - # For KPI extraction, prefer accumulated JSON string over repaired JSON - # because repairBrokenJson may lose data (e.g., empty rows array when JSON is incomplete) - updatedKpis = [] - - # First try to extract from parsedResult (repaired JSON) - if parsedResult: - try: - updatedKpis = JsonResponseHandler.extractKpiValuesFromJson( - parsedResult, - accumulationState.kpis - ) - # Check if we got meaningful values (non-zero) - hasValidValues = any(kpi.get("currentValue", 0) > 0 for kpi in updatedKpis) - if not hasValidValues and accumulationState.accumulatedJsonString: - # Repaired JSON has empty values, try accumulated string - logger.debug("Repaired JSON has empty KPI values, trying accumulated JSON string") - updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson( - accumulationState.accumulatedJsonString, - accumulationState.kpis - ) - except Exception as e: - logger.debug(f"Error extracting KPIs from parsedResult: {e}") - updatedKpis = [] - - # If no parsedResult or extraction failed, try accumulated string - if not updatedKpis and accumulationState.accumulatedJsonString: - try: - updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson( - accumulationState.accumulatedJsonString, - accumulationState.kpis - ) - except Exception as e: - logger.debug(f"Error extracting KPIs from accumulated JSON string: {e}") - updatedKpis = [] - - if updatedKpis: - shouldProceed, reason = JsonResponseHandler.validateKpiProgression( - accumulationState, - updatedKpis - ) - - if not shouldProceed: - logger.warning(f"Iteration {iteration}: KPI validation failed: {reason}") - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, False) - if operationId: - self.services.chat.progressLogUpdate(operationId, 0.9, f"KPI validation failed: {reason} ({iteration} iterations)") - break - - # Update KPIs in accumulation state - accumulationState.kpis = updatedKpis - logger.info(f"Iteration {iteration}: KPIs updated: {[(kpi.get('id'), kpi.get('currentValue')) for kpi in updatedKpis]}") - - # Check if all KPIs completed - allCompleted = True - for kpi in updatedKpis: - targetValue = kpi.get("targetValue", 0) - currentValue = kpi.get("currentValue", 0) - if currentValue < targetValue: - allCompleted = False - break - - if allCompleted: - logger.info(f"Iteration {iteration}: All KPIs completed, finishing accumulation") - wasJsonComplete = True # Mark as complete to exit loop - - # CRITICAL: Handle JSON fragments (continuation content) - # Fragment merging happens inside _extractSectionsFromResponse - # If merge fails (returns wasJsonComplete=True), stop iterations and complete JSON - if not extractedSections and allSections: - if wasJsonComplete: - # Merge failed - stop iterations, complete JSON with available data - logger.error(f"Iteration {iteration}: ❌ MERGE FAILED - Stopping iterations, completing JSON with available data") - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, False) - if operationId: - self.services.chat.progressLogUpdate(operationId, 0.9, f"Merge failed, completing JSON ({iteration} iterations)") - break - - # Fragment was detected and merged successfully - logger.info(f"Iteration {iteration}: JSON fragment detected and merged, continuing") - # Don't break - fragment was merged, continue to get more content if needed - # Check if we should continue based on JSON completeness - shouldContinue = self._shouldContinueGeneration( - allSections, - iteration, - wasJsonComplete, - result - ) - if shouldContinue: - if iterationOperationId: - self.services.chat.progressLogUpdate(iterationOperationId, 0.8, "Fragment merged, continuing") - self.services.chat.progressLogFinish(iterationOperationId, True) - continue - else: - # Done - fragment was merged and JSON is complete - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, True) - if operationId: - self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, fragment merged)") - logger.info(f"Generation complete after {iteration} iterations: fragment merged") - break - - # Extract document metadata from first iteration if available - if iteration == 1 and parsedResult and not documentMetadata: - documentMetadata = self._extractDocumentMetadata(parsedResult) - - # Update progress after parsing - if iterationOperationId: - if extractedSections: - self.services.chat.progressLogUpdate(iterationOperationId, 0.8, f"Extracted {len(extractedSections)} sections") - - if not extractedSections: - # CRITICAL: If JSON was incomplete/broken, continue even if no sections extracted - # This allows the AI to retry and complete the broken JSON - if not wasJsonComplete: - logger.warning(f"Iteration {iteration}: No sections extracted from broken JSON, continuing for another attempt") - continue - # If JSON was complete but no sections extracted - check if it was a fragment - # Fragments are handled above, so if we get here and it's complete, it's an error - logger.warning(f"Iteration {iteration}: No sections extracted from complete JSON, stopping") - break - - # NOTE: Section merging now happens BEFORE KPI validation (see above) - # This ensures sections are preserved even if KPI validation fails - - # Calculate total bytes in merged content for progress display - merged_json_str = json.dumps(allSections, indent=2, ensure_ascii=False) - totalBytesGenerated = len(merged_json_str.encode('utf-8')) - - # Update main operation with byte progress - if operationId: - # Format bytes for display - if totalBytesGenerated < 1024: - bytesDisplay = f"{totalBytesGenerated}B" - elif totalBytesGenerated < 1024 * 1024: - bytesDisplay = f"{totalBytesGenerated / 1024:.1f}kB" - else: - bytesDisplay = f"{totalBytesGenerated / (1024 * 1024):.1f}MB" - # Estimate progress based on iterations (rough estimate) - estimatedProgress = min(0.9, 0.4 + (iteration * 0.1)) - self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})") - - # Log merged sections for debugging - self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") - - # Check if we should continue (completion detection) - # Simple logic: JSON completeness determines continuation - shouldContinue = self._shouldContinueGeneration( - allSections, - iteration, - wasJsonComplete, - result - ) - - if shouldContinue: - # Finish iteration operation (will continue with next iteration) - if iterationOperationId: - # Show byte progress in iteration completion - iterBytes = len(result.encode('utf-8')) if result else 0 - if iterBytes < 1024: - iterBytesDisplay = f"{iterBytes}B" - elif iterBytes < 1024 * 1024: - iterBytesDisplay = f"{iterBytes / 1024:.1f}kB" - else: - iterBytesDisplay = f"{iterBytes / (1024 * 1024):.1f}MB" - self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Completed ({iterBytesDisplay})") - self.services.chat.progressLogFinish(iterationOperationId, True) - continue - else: - # Done - finish iteration and update main operation - if iterationOperationId: - # Show final byte count - finalBytes = len(merged_json_str.encode('utf-8')) - if finalBytes < 1024: - finalBytesDisplay = f"{finalBytes}B" - elif finalBytes < 1024 * 1024: - finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" - else: - finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" - self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Complete ({finalBytesDisplay})") - self.services.chat.progressLogFinish(iterationOperationId, True) - if operationId: - # Show final size in main operation - finalBytes = len(merged_json_str.encode('utf-8')) - if finalBytes < 1024: - finalBytesDisplay = f"{finalBytes}B" - elif finalBytes < 1024 * 1024: - finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" - else: - finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" - self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete: {finalBytesDisplay} ({iteration} iterations, {len(allSections)} sections)") - logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections") - break - - except Exception as e: - logger.error(f"Error in AI call iteration {iteration}: {str(e)}") - if iterationOperationId: - self.services.chat.progressLogFinish(iterationOperationId, False) - break - - if iteration >= maxIterations: - logger.warning(f"AI call stopped after maximum iterations ({maxIterations})") - - # CRITICAL: Complete any incomplete structures in sections before building final result - # This ensures JSON is properly closed even if merge failed or iterations stopped early - allSections = JsonResponseHandler.completeIncompleteStructures(allSections) - - # Build final result from accumulated sections - final_result = self._buildFinalResultFromSections(allSections, documentMetadata) - - # Write final result to debug file - self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") - - return final_result - - # JSON merging logic moved to subJsonResponseHandling.py + """Delegate to AiCallLooper.""" + return await self.aiCallLooper.callAiWithLooping( + prompt, options, debugPrefix, promptBuilder, promptArgs, operationId, userPrompt, contentParts + ) async def _defineKpisFromPrompt( self, @@ -618,91 +231,12 @@ Respond with ONLY a JSON object in this exact format: continuationContext: Dict[str, Any], debugPrefix: str = "kpi" ) -> List[Dict[str, Any]]: - """ - Make separate AI call to define KPIs based on user prompt and incomplete JSON. - - Args: - userPrompt: Original user prompt - rawJsonString: Raw JSON string from first iteration response - continuationContext: Continuation context (not used for JSON, kept for compatibility) - - Returns: - List of KPI definitions: [{"id": str, "description": str, "jsonPath": str, "targetValue": int}, ...] - """ - # Use raw JSON string from first iteration response - if rawJsonString: - # Remove markdown code fences if present - from modules.shared.jsonUtils import stripCodeFences - incompleteJson = stripCodeFences(rawJsonString.strip()) - else: - incompleteJson = "Not available" - - kpiDefinitionPrompt = f"""Analyze the user request and incomplete JSON to define KPIs (Key Performance Indicators) for tracking progress. - -User Request: -{userPrompt} - -Delivered JSON part: -{incompleteJson} - -Task: Define which JSON items should be tracked to measure completion progress. - -IMPORTANT: Analyze the Delivered JSON part structure to understand what is being tracked: -1. Identify the structure type (table with rows, list with items, etc.) -2. Determine what the jsonPath actually counts (number of rows, number of items, etc.) -3. Calculate targetValue based on what is being tracked, NOT the total quantity requested - -For each trackable item, provide: -- id: Unique identifier (use descriptive name) -- description: What this KPI measures (be specific about what is counted) -- jsonPath: Path to extract value from JSON (use dot notation with array indices, e.g., "documents[0].sections[1].elements[0].rows") -- targetValue: Target value to reach (integer) - MUST match what jsonPath actually tracks (rows count, items count, etc.) - -Return ONLY valid JSON in this format: -{{ - "kpis": [ - {{ - "id": "unique_id", - "description": "Description of what is measured", - "jsonPath": "path.to.value", - "targetValue": 0 - }} - ] -}} - -If no trackable items can be identified, return: {{"kpis": []}} -""" - - try: - request = AiCallRequest( - prompt=kpiDefinitionPrompt, - options=AiCallOptions( - operationType=OperationTypeEnum.DATA_ANALYSE, - priority=PriorityEnum.SPEED, - processingMode=ProcessingModeEnum.BASIC - ) - ) - - # Write KPI definition prompt to debug file - self.services.utils.writeDebugFile(kpiDefinitionPrompt, f"{debugPrefix}_kpi_definition_prompt") - - response = await self.callAi(request) - - # Write KPI definition response to debug file - self.services.utils.writeDebugFile(response.content, f"{debugPrefix}_kpi_definition_response") - - # Parse response - extracted = extractJsonString(response.content) - kpiResponse = json.loads(extracted) - - kpiDefinitions = kpiResponse.get("kpis", []) - logger.info(f"Defined {len(kpiDefinitions)} KPIs for tracking") - - return kpiDefinitions - - except Exception as e: - logger.warning(f"Failed to define KPIs: {e}, continuing without KPI tracking") - return [] + """Delegate to AiCallLooper.""" + return await self.aiCallLooper._defineKpisFromPrompt( + userPrompt, rawJsonString, continuationContext, debugPrefix + ) + + # JSON merging logic moved to subJsonResponseHandling.py def _extractSectionsFromResponse( self, @@ -717,106 +251,6 @@ If no trackable items can be identified, return: {{"kpis": []}} result, iteration, debugPrefix, allSections, accumulationState ) - def _extractSectionsFromResponse_OLD( - self, - result: str, - iteration: int, - debugPrefix: str, - allSections: List[Dict[str, Any]] = None, - accumulationState: Optional[JsonAccumulationState] = None - ) -> Tuple[List[Dict[str, Any]], bool, Optional[Dict[str, Any]], Optional[JsonAccumulationState]]: - """ - Extract sections from AI response, handling both valid and broken JSON. - - NEW BEHAVIOR: - - First iteration: Check if complete, if not start accumulation - - Subsequent iterations: Accumulate strings, parse when complete - - Returns: - Tuple of: - - sections: Extracted sections - - wasJsonComplete: True if JSON is complete - - parsedResult: Parsed JSON object - - updatedAccumulationState: Updated accumulation state (None if not in accumulation mode) - """ - if allSections is None: - allSections = [] - - if iteration == 1: - # First iteration - check if complete - parsed = None - try: - extracted = extractJsonString(result) - parsed = json.loads(extracted) - - # Check completeness - if JsonResponseHandler.isJsonComplete(parsed): - # Complete JSON - no accumulation needed - sections = extractSectionsFromDocument(parsed) - logger.info(f"Iteration 1: Complete JSON detected, no accumulation needed") - return sections, True, parsed, None # No accumulation - except Exception: - pass - - # Incomplete - try to extract partial sections from broken JSON - logger.info(f"Iteration 1: Incomplete JSON detected, attempting to extract partial sections") - - partialSections = [] - if parsed: - # Try to extract sections from parsed (even if incomplete) - partialSections = extractSectionsFromDocument(parsed) - else: - # Try to repair broken JSON and extract sections - try: - repaired = repairBrokenJson(result) - if repaired: - partialSections = extractSectionsFromDocument(repaired) - parsed = repaired # Use repaired version for accumulation state - except Exception: - pass # If repair fails, continue with empty sections - - - # Define KPIs (async call - need to handle this) - # For now, create accumulation state without KPIs, will be updated after async call - accumulationState = JsonAccumulationState( - accumulatedJsonString=result, - isAccumulationMode=True, - lastParsedResult=parsed, - allSections=partialSections, - kpis=[] - ) - - # Note: KPI definition will be done in the caller (async context) - return partialSections, False, parsed, accumulationState - - else: - # Subsequent iterations - accumulate - if accumulationState and accumulationState.isAccumulationMode: - accumulated, sections, isComplete, parsedResult = \ - JsonResponseHandler.accumulateAndParseJsonFragments( - accumulationState.accumulatedJsonString, - result, - allSections, - iteration - ) - - # Update accumulation state - accumulationState.accumulatedJsonString = accumulated - accumulationState.lastParsedResult = parsedResult - accumulationState.allSections = allSections + sections if sections else allSections - accumulationState.isAccumulationMode = not isComplete - - # Log accumulated JSON for debugging - if parsedResult: - accumulated_json_str = json.dumps(parsedResult, indent=2, ensure_ascii=False) - self.services.utils.writeDebugFile(accumulated_json_str, f"{debugPrefix}_accumulated_json_iteration_{iteration}.json") - - return sections, isComplete, parsedResult, accumulationState - else: - # No accumulation mode - process normally (shouldn't happen) - logger.warning(f"Iteration {iteration}: No accumulation state but iteration > 1") - return [], False, None, None - def _shouldContinueGeneration( self, allSections: List[Dict[str, Any]], @@ -829,85 +263,6 @@ If no trackable items can be identified, return: {{"kpis": []}} allSections, iteration, wasJsonComplete, rawResponse ) - def _shouldContinueGeneration_OLD( - self, - allSections: List[Dict[str, Any]], - iteration: int, - wasJsonComplete: bool, - rawResponse: str = None - ) -> bool: - """ - Determine if AI generation loop should continue. - - CRITICAL: This is ONLY about AI Loop Completion, NOT Action DoD! - Action DoD is checked AFTER the AI Loop completes in _refineDecide. - - Simple logic: - - If JSON parsing failed or incomplete → continue (needs more content) - - If JSON parses successfully and is complete → stop (all content delivered) - - Loop detection prevents infinite loops - - CRITICAL: JSON completeness is determined by parsing, NOT by last character check! - Returns True if we should continue, False if AI Loop is done. - """ - if len(allSections) == 0: - return True # No sections yet, continue - - # CRITERION 1: If JSON was incomplete/broken (parsing failed or incomplete) - continue to repair/complete - if not wasJsonComplete: - logger.info(f"Iteration {iteration}: JSON incomplete/broken - continuing to complete") - return True - - # CRITERION 2: JSON is complete (parsed successfully) - check for loop detection - if self._isStuckInLoop(allSections, iteration): - logger.warning(f"Iteration {iteration}: Detected potential infinite loop - stopping AI loop") - return False - - # JSON is complete and not stuck in loop - done - logger.info(f"Iteration {iteration}: JSON complete - AI loop done") - return False - - def _isStuckInLoop( - self, - allSections: List[Dict[str, Any]], - iteration: int - ) -> bool: - """ - Detect if we're stuck in a loop (same content being repeated). - - Generic approach: Check if recent iterations are adding minimal or duplicate content. - """ - if iteration < 3: - return False # Need at least 3 iterations to detect a loop - - if len(allSections) == 0: - return False - - # Check if last section is very small (might be stuck) - lastSection = allSections[-1] - elements = lastSection.get("elements", []) - - if isinstance(elements, list) and elements: - lastElem = elements[-1] if elements else {} - else: - lastElem = elements if isinstance(elements, dict) else {} - - # Check content size of last section - lastSectionSize = 0 - if isinstance(lastElem, dict): - for key, value in lastElem.items(): - if isinstance(value, str): - lastSectionSize += len(value) - elif isinstance(value, list): - lastSectionSize += len(str(value)) - - # If last section is very small and we've done many iterations, might be stuck - if lastSectionSize < 100 and iteration > 10: - logger.warning(f"Potential loop detected: iteration {iteration}, last section size {lastSectionSize}") - return True - - return False - def _extractDocumentMetadata( self, parsedResult: Dict[str, Any] @@ -915,31 +270,6 @@ If no trackable items can be identified, return: {{"kpis": []}} """Delegate to ResponseParser.""" return self.responseParser.extractDocumentMetadata(parsedResult) - def _extractDocumentMetadata_OLD( - self, - parsedResult: Dict[str, Any] - ) -> Optional[Dict[str, Any]]: - """ - Extract document metadata (title, filename) from parsed AI response. - Returns dict with 'title' and 'filename' keys if found, None otherwise. - """ - if not isinstance(parsedResult, dict): - return None - - # Try to get from documents array (preferred structure) - if "documents" in parsedResult and isinstance(parsedResult["documents"], list) and len(parsedResult["documents"]) > 0: - firstDoc = parsedResult["documents"][0] - if isinstance(firstDoc, dict): - title = firstDoc.get("title") - filename = firstDoc.get("filename") - if title or filename: - return { - "title": title, - "filename": filename - } - - return None - def _buildFinalResultFromSections( self, allSections: List[Dict[str, Any]], @@ -948,47 +278,6 @@ If no trackable items can be identified, return: {{"kpis": []}} """Delegate to ResponseParser.""" return self.responseParser.buildFinalResultFromSections(allSections, documentMetadata) - def _buildFinalResultFromSections_OLD( - self, - allSections: List[Dict[str, Any]], - documentMetadata: Optional[Dict[str, Any]] = None - ) -> str: - """ - Build final JSON result from accumulated sections. - Uses AI-provided metadata (title, filename) if available. - """ - if not allSections: - return "" - - # Extract metadata from AI response if available - title = "Generated Document" - filename = "document.json" - if documentMetadata: - if documentMetadata.get("title"): - title = documentMetadata["title"] - if documentMetadata.get("filename"): - filename = documentMetadata["filename"] - - # Build documents structure - # Assuming single document for now - documents = [{ - "id": "doc_1", - "title": title, - "filename": filename, - "sections": allSections - }] - - result = { - "metadata": { - "split_strategy": "single_document", - "source_documents": [], - "extraction_method": "ai_generation" - }, - "documents": documents - } - - return json.dumps(result, indent=2) - # Public API Methods # Planning AI Call @@ -1163,412 +452,6 @@ If no trackable items can be identified, return: {{"kpis": []}} documents, userPrompt, actionParameters, parentOperationId ) - async def _clarifyDocumentIntents_OLD( - self, - documents: List[ChatDocument], - userPrompt: str, - actionParameters: Dict[str, Any], - parentOperationId: str - ) -> List[DocumentIntent]: - """ - Phase 5A: Analysiert, welche Dokumente Extraktion vs Referenz benötigen. - Gibt DocumentIntent für jedes Dokument zurück. - - Args: - documents: Liste der zu verarbeitenden Dokumente - userPrompt: User-Anfrage - actionParameters: Action-spezifische Parameter (z.B. resultType, outputFormat) - parentOperationId: Parent Operation-ID für ChatLog-Hierarchie - - Returns: - Liste von DocumentIntent-Objekten - """ - from modules.datamodels.datamodelChat import ChatDocument - - # Erstelle Operation-ID für Intent-Analyse - intentOperationId = f"{parentOperationId}_intent_analysis" - - # Starte ChatLog mit Parent-Referenz - self.services.chat.progressLogStart( - intentOperationId, - "Document Intent Analysis", - "Intent Analysis", - f"Analyzing {len(documents)} documents", - parentOperationId=parentOperationId - ) - - try: - # Mappe pre-extracted JSONs zu ursprünglichen Dokument-IDs für Intent-Analyse - documentMapping = {} # Maps original doc ID -> JSON doc ID - resolvedDocuments = [] - - for doc in documents: - preExtracted = self._resolvePreExtractedDocument(doc) - if preExtracted: - originalDocId = preExtracted["originalDocument"]["id"] - documentMapping[originalDocId] = doc.id - # Erstelle temporäres ChatDocument für ursprüngliches Dokument - from modules.datamodels.datamodelChat import ChatDocument - originalDoc = ChatDocument( - id=originalDocId, - fileName=preExtracted["originalDocument"]["fileName"], - mimeType=preExtracted["originalDocument"]["mimeType"], - fileSize=preExtracted["originalDocument"].get("fileSize", doc.fileSize), - fileId=doc.fileId, # Behalte fileId vom JSON - messageId=doc.messageId if hasattr(doc, 'messageId') else None # Behalte messageId falls vorhanden - ) - resolvedDocuments.append(originalDoc) - else: - resolvedDocuments.append(doc) - - # Baue Intent-Analyse-Prompt mit ursprünglichen Dokumenten - intentPrompt = self._buildIntentAnalysisPrompt(userPrompt, resolvedDocuments, actionParameters) - - # AI-Call (verwende callAiPlanning für einfache JSON-Responses) - # Debug-Logs werden bereits von callAiPlanning geschrieben - aiResponse = await self.callAiPlanning( - prompt=intentPrompt, - debugType="document_intent_analysis" - ) - - # Parse Result und mappe zurück zu JSON-Dokument-IDs falls nötig - intentsData = json.loads(self.services.utils.jsonExtractString(aiResponse)) - documentIntents = [] - for intent in intentsData.get("intents", []): - docId = intent.get("documentId") - # Wenn Intent für ursprüngliches Dokument, mappe zurück zu JSON-Dokument-ID - if docId in documentMapping: - intent["documentId"] = documentMapping[docId] - documentIntents.append(DocumentIntent(**intent)) - - # Debug-Log (harmonisiert) - self.services.utils.writeDebugFile( - json.dumps([intent.dict() for intent in documentIntents], indent=2), - "document_intent_analysis_result" - ) - - # ChatLog abschließen - self.services.chat.progressLogFinish(intentOperationId, True) - - return documentIntents - - except Exception as e: - self.services.chat.progressLogFinish(intentOperationId, False) - logger.error(f"Error in _clarifyDocumentIntents: {str(e)}") - raise - - def _resolvePreExtractedDocument(self, document: ChatDocument) -> Optional[Dict[str, Any]]: - """ - Prüft ob ein JSON-Dokument bereits extrahierte ContentParts enthält. - Gibt Dict zurück mit: - - originalDocument: ChatDocument-Info des ursprünglichen Dokuments - - contentExtracted: ContentExtracted-Objekt mit Parts - - parts: Liste der ContentParts - - Returns None wenn kein pre-extracted Format erkannt wird. - """ - if document.mimeType != "application/json": - logger.debug(f"Document {document.id} is not JSON (mimeType={document.mimeType}), skipping pre-extracted check") - return None - - try: - docBytes = self.services.interfaceDbComponent.getFileData(document.fileId) - if not docBytes: - return None - - docData = docBytes.decode('utf-8') - jsonData = json.loads(docData) - - if not isinstance(jsonData, dict): - return None - - # Check for ContentExtracted format - # Nur Format 1 (ActionDocument-Format mit validationMetadata) wird unterstützt - documentData = None - - validationMetadata = jsonData.get("validationMetadata", {}) - actionType = validationMetadata.get("actionType") - logger.debug(f"JSON document {document.id}: validationMetadata.actionType={actionType}, keys={list(jsonData.keys())}") - - if actionType == "context.extractContent": - # Format: {"validationMetadata": {"actionType": "context.extractContent"}, "documentData": {...}} - documentData = jsonData.get("documentData") - logger.debug(f"Found ContentExtracted via validationMetadata for {document.fileName}, documentData keys: {list(documentData.keys()) if documentData else None}") - else: - logger.debug(f"JSON document {document.id} does not have actionType='context.extractContent' (got: {actionType})") - - if documentData: - from modules.datamodels.datamodelExtraction import ContentExtracted - - try: - # Stelle sicher, dass "id" vorhanden ist - if "id" not in documentData: - documentData["id"] = document.id - - contentExtracted = ContentExtracted(**documentData) - - if contentExtracted.parts: - # Extrahiere ursprüngliche Dokument-Info aus den Parts - originalDocId = None - originalFileName = None - originalMimeType = None - - for part in contentExtracted.parts: - if part.metadata: - # Versuche ursprüngliche Dokument-Info zu finden - if not originalDocId and part.metadata.get("documentId"): - originalDocId = part.metadata.get("documentId") - if not originalFileName and part.metadata.get("originalFileName"): - originalFileName = part.metadata.get("originalFileName") - if not originalMimeType and part.metadata.get("documentMimeType"): - originalMimeType = part.metadata.get("documentMimeType") - - # Falls nicht gefunden, versuche aus documentName zu extrahieren - if not originalFileName: - # Versuche aus documentName zu extrahieren (z.B. "B2025-02c_28_extracted_...json" -> "B2025-02c_28.pdf") - if document.fileName and "_extracted_" in document.fileName: - originalFileName = document.fileName.split("_extracted_")[0] + ".pdf" - - return { - "originalDocument": { - "id": originalDocId or document.id, - "fileName": originalFileName or document.fileName, - "mimeType": originalMimeType or "application/pdf", - "fileSize": document.fileSize - }, - "contentExtracted": contentExtracted, - "parts": contentExtracted.parts - } - except Exception as parseError: - logger.warning(f"Could not parse ContentExtracted format from {document.fileName}: {str(parseError)}") - logger.debug(f"JSON keys: {list(jsonData.keys())}, has parts: {'parts' in jsonData}") - import traceback - logger.debug(f"Parse error traceback: {traceback.format_exc()}") - return None - else: - logger.debug(f"JSON document {document.id} has no documentData (actionType={actionType})") - - return None - except Exception as e: - logger.debug(f"Error resolving pre-extracted document {document.fileName}: {str(e)}") - return None - - async def _extractTextFromImage(self, imagePart: ContentPart, extractionPrompt: str) -> Optional[str]: - """ - Extrahiere Text aus einem Image-Part mit Vision AI. - - Args: - imagePart: ContentPart mit typeGroup="image" - extractionPrompt: Prompt für die Text-Extraktion - - Returns: - Extrahierter Text oder None bei Fehler - """ - try: - from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum - - # Final extraction prompt - finalPrompt = extractionPrompt or "Extract all text content from this image. Return only the extracted text, no additional formatting." - - # Debug-Log (harmonisiert) - self.services.utils.writeDebugFile( - finalPrompt, - f"content_extraction_prompt_image_{imagePart.id}" - ) - - # Erstelle AI-Call-Request mit Image-Part - request = AiCallRequest( - prompt=finalPrompt, - context="", - options=AiCallOptions(operationType=OperationTypeEnum.IMAGE_ANALYSE), - contentParts=[imagePart] - ) - - # Verwende AI-Service für Vision AI-Verarbeitung - response = await self.services.ai.callAi(request) - - # Debug-Log für Response (harmonisiert) - if response and response.content: - self.services.utils.writeDebugFile( - response.content, - f"content_extraction_response_image_{imagePart.id}" - ) - - if response and response.content: - return response.content.strip() - - # Kein Content zurückgegeben - return error message für Debugging - errorMsg = f"Vision AI extraction failed: No content returned for image {imagePart.id}" - logger.warning(errorMsg) - return f"[ERROR: {errorMsg}]" - except Exception as e: - errorMsg = f"Vision AI extraction failed for image {imagePart.id}: {str(e)}" - logger.error(errorMsg) - import traceback - logger.debug(f"Traceback: {traceback.format_exc()}") - # Return error message statt None für Debugging - return f"[ERROR: {errorMsg}]" - - async def _processTextContentWithAi(self, textPart: ContentPart, extractionPrompt: str) -> Optional[str]: - """ - Verarbeite Text-Content mit AI basierend auf extractionPrompt. - - WICHTIG: Pre-extracted ContentParts von context.extractContent enthalten RAW extrahierten Text - (z.B. aus PDF-Text-Layer). Wenn "extract" Intent vorhanden ist, muss dieser Text mit AI - verarbeitet werden (Transformation, Strukturierung, etc.) basierend auf extractionPrompt. - - Args: - textPart: ContentPart mit typeGroup="text" (oder anderer Text-basierter Typ) - extractionPrompt: Prompt für die AI-Verarbeitung des Textes - - Returns: - AI-verarbeiteter Text oder None bei Fehler - """ - try: - from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum - - # Final extraction prompt - finalPrompt = extractionPrompt or "Process and extract the key information from the following text content." - - # Debug-Log (harmonisiert) - log prompt with text preview - textPreview = textPart.data[:500] + "..." if textPart.data and len(textPart.data) > 500 else (textPart.data or "") - promptWithContext = f"{finalPrompt}\n\n--- Text Content (preview) ---\n{textPreview}" - self.services.utils.writeDebugFile( - promptWithContext, - f"content_extraction_prompt_text_{textPart.id}" - ) - - # Erstelle Text-ContentPart für AI-Verarbeitung - # Verwende den vorhandenen Text als Input - textContentPart = ContentPart( - id=textPart.id, - label=textPart.label, - typeGroup="text", - mimeType="text/plain", - data=textPart.data if textPart.data else "", - metadata=textPart.metadata.copy() if textPart.metadata else {} - ) - - # Erstelle AI-Call-Request mit Text-Part - request = AiCallRequest( - prompt=finalPrompt, - context="", - options=AiCallOptions(operationType=OperationTypeEnum.DATA_EXTRACT), - contentParts=[textContentPart] - ) - - # Verwende AI-Service für Text-Verarbeitung - response = await self.services.ai.callAi(request) - - # Debug-Log für Response (harmonisiert) - if response and response.content: - self.services.utils.writeDebugFile( - response.content, - f"content_extraction_response_text_{textPart.id}" - ) - - if response and response.content: - return response.content.strip() - - # Kein Content zurückgegeben - return error message für Debugging - errorMsg = f"AI text processing failed: No content returned for text part {textPart.id}" - logger.warning(errorMsg) - return f"[ERROR: {errorMsg}]" - except Exception as e: - errorMsg = f"AI text processing failed for text part {textPart.id}: {str(e)}" - logger.error(errorMsg) - import traceback - logger.debug(f"Traceback: {traceback.format_exc()}") - # Return error message statt None für Debugging - return f"[ERROR: {errorMsg}]" - - def _buildIntentAnalysisPrompt( - self, - userPrompt: str, - documents: List[ChatDocument], - actionParameters: Dict[str, Any] - ) -> str: - """Baue Prompt für Intent-Analyse.""" - # Baue Dokument-Liste - zeige ursprüngliche Dokumente für pre-extracted JSONs - docListText = "" - for i, doc in enumerate(documents, 1): - # Prüfe ob es ein pre-extracted JSON ist - preExtracted = self._resolvePreExtractedDocument(doc) - - if preExtracted: - # Zeige ursprüngliches Dokument statt JSON - originalDoc = preExtracted["originalDocument"] - partsInfo = f" (contains {len(preExtracted['parts'])} pre-extracted parts: {', '.join([p.typeGroup for p in preExtracted['parts'] if p.data and len(str(p.data)) > 0])})" - docListText += f"\n{i}. Document ID: {originalDoc['id']}\n" - docListText += f" File Name: {originalDoc['fileName']}{partsInfo}\n" - docListText += f" MIME Type: {originalDoc['mimeType']}\n" - docListText += f" File Size: {originalDoc.get('fileSize', doc.fileSize)} bytes\n" - else: - # Normales Dokument - docListText += f"\n{i}. Document ID: {doc.id}\n" - docListText += f" File Name: {doc.fileName}\n" - docListText += f" MIME Type: {doc.mimeType}\n" - docListText += f" File Size: {doc.fileSize} bytes\n" - - outputFormat = actionParameters.get("outputFormat", "txt") - - prompt = f"""USER REQUEST: -{userPrompt} - -DOCUMENTS TO ANALYZE: -{docListText} - -TASK: For each document, determine its intents (can be multiple): -- "extract": Content extraction needed (text, structure, OCR, etc.) -- "render": Image/binary should be rendered as-is (visual element) -- "reference": Document reference/attachment (no extraction, just reference) - -OUTPUT FORMAT: {outputFormat} - -RETURN JSON: -{{ - "intents": [ - {{ - "documentId": "doc_1", - "intents": ["extract"], # Array - can contain multiple! - "extractionPrompt": "Extract all text content, preserving structure", - "reasoning": "User needs text content for document generation" - }}, - {{ - "documentId": "doc_2", - "intents": ["extract", "render"], # Both! Image needs text extraction AND visual rendering - "extractionPrompt": "Extract text content from image using vision AI", - "reasoning": "Image contains text that needs extraction, but also should be rendered visually" - }}, - {{ - "documentId": "doc_3", - "intents": ["reference"], - "extractionPrompt": null, - "reasoning": "Document is only used as reference, no extraction needed" - }} - ] -}} - -CRITICAL RULES: -1. For images (mimeType starts with "image/"): - - If user wants to "include" or "show" images → add "render" - - If user wants to "analyze", "read text", or "extract text" from images → add "extract" - - Can have BOTH "extract" and "render" if image needs both text extraction and visual rendering - -2. For text documents: - - If user mentions "template" or "structure" → "reference" or "extract" based on context - - If user mentions "reference" or "context" → "reference" - - Default → "extract" - -3. Consider output format: - - For formats like PDF, DOCX, PPTX: images usually need "render" - - For formats like CSV, JSON: usually "extract" only - - For HTML: can have both "extract" and "render" - -Return ONLY valid JSON following the structure above. -""" - return prompt - async def _extractAndPrepareContent( self, documents: List[ChatDocument], @@ -1580,514 +463,6 @@ Return ONLY valid JSON following the structure above. documents, documentIntents, parentOperationId, self._getIntentForDocument ) - async def _extractAndPrepareContent_OLD( - self, - documents: List[ChatDocument], - documentIntents: List[DocumentIntent], - parentOperationId: str - ) -> List[ContentPart]: - """ - Phase 5B: Extrahiert Content basierend auf Intents und bereitet ContentParts mit Metadaten vor. - Gibt Liste von ContentParts im passenden Format zurück. - - WICHTIG: Ein Dokument kann mehrere ContentParts erzeugen, wenn mehrere Intents vorhanden sind. - Beispiel: Bild mit intents=["extract", "render"] erzeugt: - - ContentPart(contentFormat="object", ...) für Rendering - - ContentPart(contentFormat="extracted", ...) für Text-Analyse - - Args: - documents: Liste der zu verarbeitenden Dokumente - documentIntents: Liste von DocumentIntent-Objekten - parentOperationId: Parent Operation-ID für ChatLog-Hierarchie - - Returns: - Liste von ContentParts mit vollständigen Metadaten - """ - # Erstelle Operation-ID für Extraktion - extractionOperationId = f"{parentOperationId}_content_extraction" - - # Starte ChatLog mit Parent-Referenz - self.services.chat.progressLogStart( - extractionOperationId, - "Content Extraction", - "Extraction", - f"Extracting from {len(documents)} documents", - parentOperationId=parentOperationId - ) - - try: - allContentParts = [] - - for document in documents: - # Check if document is already a ContentExtracted document (pre-extracted JSON) - logger.debug(f"Checking document {document.id} ({document.fileName}, mimeType={document.mimeType}) for pre-extracted content") - preExtracted = self._resolvePreExtractedDocument(document) - - if preExtracted: - logger.info(f"✅ Found pre-extracted document: {document.fileName} -> Original: {preExtracted['originalDocument']['fileName']}") - logger.info(f" Pre-extracted document ID: {document.id}, Original document ID: {preExtracted['originalDocument']['id']}") - logger.info(f" ContentParts count: {len(preExtracted['contentExtracted'].parts) if preExtracted['contentExtracted'].parts else 0}") - - # Verwende bereits extrahierte ContentParts direkt - contentExtracted = preExtracted["contentExtracted"] - - # WICHTIG: Intent muss für das JSON-Dokument gefunden werden, nicht für das Original - # (Intent-Analyse mappt bereits zurück zu JSON-Dokument-ID) - intent = self._getIntentForDocument(document.id, documentIntents) - logger.info(f" Intent lookup for document {document.id}: found={intent is not None}") - if intent: - logger.info(f" Intent: {intent.intents}, extractionPrompt: {intent.extractionPrompt[:100] if intent.extractionPrompt else None}...") - else: - logger.warning(f" ⚠️ No intent found for pre-extracted document {document.id}! Available intent documentIds: {[i.documentId for i in documentIntents]}") - - if contentExtracted.parts: - for part in contentExtracted.parts: - # Überspringe leere Parts (Container ohne Daten) - if not part.data or (isinstance(part.data, str) and len(part.data.strip()) == 0): - if part.typeGroup == "container": - continue # Überspringe leere Container - - if not part.metadata: - part.metadata = {} - - # Ensure metadata is complete - if "documentId" not in part.metadata: - part.metadata["documentId"] = document.id - - # WICHTIG: Prüfe Intent für dieses Part - partIntent = intent.intents if intent else ["extract"] - - # Debug-Logging für Intent-Verarbeitung - logger.debug(f"Processing part {part.id}: typeGroup={part.typeGroup}, intents={partIntent}, hasData={bool(part.data)}, dataLength={len(str(part.data)) if part.data else 0}") - - # WICHTIG: Ein Part kann mehrere Intents haben - erstelle für jeden Intent einen ContentPart - # Generische Intent-Verarbeitung für ALLE Content-Typen - hasReferenceIntent = "reference" in partIntent - hasRenderIntent = "render" in partIntent - hasExtractIntent = "extract" in partIntent - hasPartData = bool(part.data) and (not isinstance(part.data, str) or len(part.data.strip()) > 0) - - logger.debug(f"Part {part.id}: reference={hasReferenceIntent}, render={hasRenderIntent}, extract={hasExtractIntent}, hasData={hasPartData}") - - # Track ob der originale Part bereits hinzugefügt wurde - originalPartAdded = False - - # 1. Reference Intent: Erstelle Reference ContentPart - if hasReferenceIntent: - referencePart = ContentPart( - id=f"ref_{document.id}_{part.id}", - label=f"Reference: {part.label or 'Content'}", - typeGroup="reference", - mimeType=part.mimeType or "application/octet-stream", - data="", # Leer - nur Referenz - metadata={ - "contentFormat": "reference", - "documentId": document.id, - "documentReference": f"docItem:{document.id}:{preExtracted['originalDocument']['fileName']}", - "intent": "reference", - "usageHint": f"Reference: {preExtracted['originalDocument']['fileName']}", - "originalFileName": preExtracted["originalDocument"]["fileName"] - } - ) - allContentParts.append(referencePart) - logger.debug(f"✅ Created reference ContentPart for {part.id}") - - # 2. Render Intent: Erstelle Object ContentPart (für Binary/Image Rendering) - if hasRenderIntent and hasPartData: - # Prüfe ob es ein Binary/Image ist (kann gerendert werden) - isRenderable = ( - part.typeGroup == "image" or - part.typeGroup == "binary" or - (part.mimeType and ( - part.mimeType.startswith("image/") or - part.mimeType.startswith("video/") or - part.mimeType.startswith("audio/") or - self._isBinary(part.mimeType) - )) - ) - - if isRenderable: - objectPart = ContentPart( - id=f"obj_{document.id}_{part.id}", - label=f"Object: {part.label or 'Content'}", - typeGroup=part.typeGroup, - mimeType=part.mimeType or "application/octet-stream", - data=part.data, # Base64/Binary data ist bereits vorhanden - metadata={ - "contentFormat": "object", - "documentId": document.id, - "intent": "render", - "usageHint": f"Render as visual element: {preExtracted['originalDocument']['fileName']}", - "originalFileName": preExtracted["originalDocument"]["fileName"], - "relatedExtractedPartId": f"extracted_{document.id}_{part.id}" if hasExtractIntent else None - } - ) - allContentParts.append(objectPart) - logger.debug(f"✅ Created object ContentPart for {part.id} (render intent)") - else: - logger.warning(f"⚠️ Part {part.id} has render intent but is not renderable (typeGroup={part.typeGroup}, mimeType={part.mimeType})") - elif hasRenderIntent and not hasPartData: - logger.warning(f"⚠️ Part {part.id} has render intent but no data, skipping render part") - - # 3. Extract Intent: Erstelle Extracted ContentPart (möglicherweise mit zusätzlicher Verarbeitung) - if hasExtractIntent: - # Spezielle Behandlung für Images: Vision AI für Text-Extraktion - if part.typeGroup == "image" and hasPartData: - logger.info(f"🔄 Processing image {part.id} with Vision AI (extract intent)") - try: - extractionPrompt = intent.extractionPrompt if intent and intent.extractionPrompt else "Extract all text content from this image. Return only the extracted text, no additional formatting." - extractedText = await self._extractTextFromImage(part, extractionPrompt) - if extractedText: - # Prüfe ob es ein Error-Message ist - isError = extractedText.startswith("[ERROR:") - - # Erstelle neuen Text-Part mit extrahiertem Text oder Error-Message - textPart = ContentPart( - id=f"extracted_{document.id}_{part.id}", - label=f"Extracted text from {part.label or 'Image'}" if not isError else f"Error extracting from {part.label or 'Image'}", - typeGroup="text", - mimeType="text/plain", - data=extractedText, - metadata={ - "contentFormat": "extracted", - "documentId": document.id, - "intent": "extract", - "originalFileName": preExtracted["originalDocument"]["fileName"], - "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None, - "extractionPrompt": extractionPrompt, - "extractionMethod": "vision", - "isError": isError - } - ) - allContentParts.append(textPart) - if isError: - logger.error(f"❌ Vision AI extraction failed for image {part.id}: {extractedText}") - else: - logger.info(f"✅ Extracted text from image {part.id} using Vision AI: {len(extractedText)} chars") - else: - # Sollte nicht vorkommen (Funktion gibt jetzt immer Error-Message zurück) - errorMsg = f"Vision AI extraction failed: Unexpected empty response for image {part.id}" - logger.error(errorMsg) - errorPart = ContentPart( - id=f"extracted_{document.id}_{part.id}", - label=f"Error extracting from {part.label or 'Image'}", - typeGroup="text", - mimeType="text/plain", - data=f"[ERROR: {errorMsg}]", - metadata={ - "contentFormat": "extracted", - "documentId": document.id, - "intent": "extract", - "originalFileName": preExtracted["originalDocument"]["fileName"], - "extractionPrompt": extractionPrompt, - "extractionMethod": "vision", - "isError": True - } - ) - allContentParts.append(errorPart) - except Exception as e: - logger.error(f"❌ Failed to extract text from image {part.id}: {str(e)}") - import traceback - logger.debug(f"Traceback: {traceback.format_exc()}") - # Kein Fallback: Wenn render Intent vorhanden, haben wir bereits object Part - # Wenn nur extract Intent: Original Part ist kein Text, daher nicht als extracted hinzufügen - if not hasRenderIntent: - logger.debug(f"Image {part.id} has only extract intent, Vision AI failed - no extracted text available") - else: - # Für alle anderen Content-Typen: Prüfe ob AI-Verarbeitung benötigt wird - # WICHTIG: Pre-extracted ContentParts von context.extractContent enthalten RAW extrahierten Content - # (z.B. Text aus PDF-Text-Layer, Tabellen, etc.). Wenn "extract" Intent vorhanden ist, - # muss dieser Content mit AI verarbeitet werden basierend auf extractionPrompt. - - # Prüfe ob Part Text-Content hat (kann mit AI verarbeitet werden) - isTextContent = ( - part.typeGroup == "text" or - part.typeGroup == "table" or - (part.data and isinstance(part.data, str) and len(part.data.strip()) > 0) - ) - - if isTextContent and intent and intent.extractionPrompt: - # Text-Content mit extractionPrompt: Verarbeite mit AI - logger.info(f"🔄 Processing text content {part.id} with AI (extract intent with prompt)") - try: - extractionPrompt = intent.extractionPrompt - processedText = await self._processTextContentWithAi(part, extractionPrompt) - if processedText: - # Prüfe ob es ein Error-Message ist - isError = processedText.startswith("[ERROR:") - - # Erstelle neuen Text-Part mit AI-verarbeitetem Text oder Error-Message - processedPart = ContentPart( - id=f"extracted_{document.id}_{part.id}", - label=f"AI-processed: {part.label or 'Content'}" if not isError else f"Error processing {part.label or 'Content'}", - typeGroup="text", - mimeType="text/plain", - data=processedText, - metadata={ - "contentFormat": "extracted", - "documentId": document.id, - "intent": "extract", - "originalFileName": preExtracted["originalDocument"]["fileName"], - "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None, - "extractionPrompt": extractionPrompt, - "extractionMethod": "ai", - "sourcePartId": part.id, - "fromExtractContent": True, - "isError": isError - } - ) - allContentParts.append(processedPart) - originalPartAdded = True - if isError: - logger.error(f"❌ AI text processing failed for part {part.id}: {processedText}") - else: - logger.info(f"✅ Processed text content {part.id} with AI: {len(processedText)} chars") - else: - # Sollte nicht vorkommen (Funktion gibt jetzt immer Error-Message zurück) - errorMsg = f"AI text processing failed: Unexpected empty response for part {part.id}" - logger.error(errorMsg) - errorPart = ContentPart( - id=f"extracted_{document.id}_{part.id}", - label=f"Error processing {part.label or 'Content'}", - typeGroup="text", - mimeType="text/plain", - data=f"[ERROR: {errorMsg}]", - metadata={ - "contentFormat": "extracted", - "documentId": document.id, - "intent": "extract", - "originalFileName": preExtracted["originalDocument"]["fileName"], - "extractionPrompt": extractionPrompt, - "extractionMethod": "ai", - "sourcePartId": part.id, - "isError": True - } - ) - allContentParts.append(errorPart) - originalPartAdded = True - except Exception as e: - logger.error(f"❌ Failed to process text content {part.id} with AI: {str(e)}") - import traceback - logger.debug(f"Traceback: {traceback.format_exc()}") - # Fallback: Verwende Original-Part - if not originalPartAdded: - part.metadata.update({ - "contentFormat": "extracted", - "intent": "extract", - "fromExtractContent": True, - "skipExtraction": True, - "originalFileName": preExtracted["originalDocument"]["fileName"], - "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None - }) - allContentParts.append(part) - originalPartAdded = True - else: - # Kein extractionPrompt oder kein Text-Content: Verwende Part direkt als extracted - # (Content ist bereits extrahiert von context.extractContent, keine weitere AI-Verarbeitung nötig) - # WICHTIG: Nur hinzufügen wenn noch nicht hinzugefügt (z.B. durch render Intent) - if not originalPartAdded: - part.metadata.update({ - "contentFormat": "extracted", - "intent": "extract", - "fromExtractContent": True, - "skipExtraction": True, # Bereits extrahiert - "originalFileName": preExtracted["originalDocument"]["fileName"], - "relatedObjectPartId": f"obj_{document.id}_{part.id}" if hasRenderIntent else None - }) - # Stelle sicher dass contentFormat gesetzt ist - if "contentFormat" not in part.metadata: - part.metadata["contentFormat"] = "extracted" - allContentParts.append(part) - originalPartAdded = True - logger.debug(f"✅ Using pre-extracted ContentPart {part.id} as extracted (no AI processing needed)") - - # 4. Fallback: Wenn kein Intent vorhanden oder Part wurde noch nicht hinzugefügt - # (sollte normalerweise nicht vorkommen, da default "extract" ist) - if not hasReferenceIntent and not hasRenderIntent and not hasExtractIntent and not originalPartAdded: - logger.warning(f"⚠️ Part {part.id} has no recognized intents, adding as extracted by default") - part.metadata.update({ - "contentFormat": "extracted", - "intent": "extract", - "fromExtractContent": True, - "skipExtraction": True, - "originalFileName": preExtracted["originalDocument"]["fileName"] - }) - allContentParts.append(part) - originalPartAdded = True - - logger.info(f"✅ Using {len([p for p in contentExtracted.parts if p.data and len(str(p.data)) > 0])} pre-extracted ContentParts from ContentExtracted document {document.fileName}") - logger.info(f" Original document: {preExtracted['originalDocument']['fileName']}") - continue # Skip normal extraction for this document - - # Check if it's standardized JSON format (has "documents" or "sections") - if document.mimeType == "application/json": - try: - docBytes = self.services.interfaceDbComponent.getFileData(document.fileId) - if docBytes: - docData = docBytes.decode('utf-8') - jsonData = json.loads(docData) - - if isinstance(jsonData, dict) and ("documents" in jsonData or "sections" in jsonData): - logger.info(f"Document is already in standardized JSON format, using as reference") - # Create reference ContentPart for structured JSON - contentPart = ContentPart( - id=f"ref_{document.id}", - label=f"Reference: {document.fileName}", - typeGroup="structure", - mimeType="application/json", - data=docData, - metadata={ - "contentFormat": "reference", - "documentId": document.id, - "documentReference": f"docItem:{document.id}:{document.fileName}", - "skipExtraction": True, - "intent": "reference" - } - ) - allContentParts.append(contentPart) - logger.info(f"✅ Using JSON document directly without extraction") - continue # Skip normal extraction for this document - except Exception as e: - logger.warning(f"Could not parse JSON document {document.fileName}, will extract normally: {str(e)}") - # Continue with normal extraction - - # Normal extraction path - intent = self._getIntentForDocument(document.id, documentIntents) - - if not intent: - # Default: extract für alle Dokumente ohne Intent - logger.warning(f"No intent found for document {document.id}, using default 'extract'") - intent = DocumentIntent( - documentId=document.id, - intents=["extract"], - extractionPrompt="Extract all content from the document", - reasoning="Default intent: no specific intent found" - ) - - # WICHTIG: Prüfe alle Intents - ein Dokument kann mehrere ContentParts erzeugen - - if "reference" in intent.intents: - # Erstelle Reference ContentPart - contentPart = ContentPart( - id=f"ref_{document.id}", - label=f"Reference: {document.fileName}", - typeGroup="reference", - mimeType=document.mimeType, - data="", - metadata={ - "contentFormat": "reference", - "documentId": document.id, - "documentReference": f"docItem:{document.id}:{document.fileName}", - "intent": "reference", - "usageHint": f"Reference document: {document.fileName}" - } - ) - allContentParts.append(contentPart) - - # WICHTIG: "render" und "extract" können beide vorhanden sein! - # In diesem Fall erzeugen wir BEIDE ContentParts - - if "render" in intent.intents: - # Für Images/Binary: extrahiere als Object - if document.mimeType.startswith("image/") or self._isBinary(document.mimeType): - try: - # Lade Binary-Daten (getFileData ist nicht async - keine await nötig) - binaryData = self.services.interfaceDbComponent.getFileData(document.fileId) - if not binaryData: - logger.warning(f"No binary data found for document {document.id}") - continue - base64Data = base64.b64encode(binaryData).decode('utf-8') - - contentPart = ContentPart( - id=f"obj_{document.id}", - label=f"Object: {document.fileName}", - typeGroup="image" if document.mimeType.startswith("image/") else "binary", - mimeType=document.mimeType, - data=base64Data, - metadata={ - "contentFormat": "object", - "documentId": document.id, - "intent": "render", - "usageHint": f"Render as visual element: {document.fileName}", - "originalFileName": document.fileName, - # Verknüpfung zu extracted Part (falls vorhanden) - "relatedExtractedPartId": f"ext_{document.id}" if "extract" in intent.intents else None - } - ) - allContentParts.append(contentPart) - except Exception as e: - logger.error(f"Failed to load binary data for document {document.id}: {str(e)}") - - if "extract" in intent.intents: - # Extrahiere Content mit Extraction Service - extractionPrompt = intent.extractionPrompt or "Extract all content from the document" - - # Debug-Log (harmonisiert) - self.services.utils.writeDebugFile( - extractionPrompt, - f"content_extraction_prompt_{document.id}" - ) - - # Führe Extraktion aus - from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy - - extractionOptions = ExtractionOptions( - prompt=extractionPrompt, - mergeStrategy=MergeStrategy() - ) - - # extractContent ist nicht async - keine await nötig - extractedResults = self.services.extraction.extractContent( - [document], - extractionOptions, - operationId=extractionOperationId, - parentOperationId=extractionOperationId - ) - - # Konvertiere extrahierte Ergebnisse zu ContentParts mit Metadaten - for extracted in extractedResults: - for part in extracted.parts: - # Markiere als extracted Format - part.metadata.update({ - "contentFormat": "extracted", - "documentId": document.id, - "extractionPrompt": extractionPrompt, - "intent": "extract", - "usageHint": f"Use extracted content from {document.fileName}", - # Verknüpfung zu object Part (falls vorhanden) - "relatedObjectPartId": f"obj_{document.id}" if "render" in intent.intents else None - }) - # Stelle sicher, dass ID eindeutig ist (falls object Part existiert) - if "render" in intent.intents: - part.id = f"ext_{document.id}_{part.id}" - allContentParts.append(part) - - # Debug-Log (harmonisiert) - self.services.utils.writeDebugFile( - json.dumps([part.dict() for part in allContentParts], indent=2, default=str), - "content_extraction_result" - ) - - # ChatLog abschließen - self.services.chat.progressLogFinish(extractionOperationId, True) - - return allContentParts - - except Exception as e: - self.services.chat.progressLogFinish(extractionOperationId, False) - logger.error(f"Error in _extractAndPrepareContent: {str(e)}") - raise - - def _isBinary(self, mimeType: str) -> bool: - """Prüfe ob MIME-Type binary ist.""" - binaryTypes = [ - "application/octet-stream", - "application/pdf", - "application/zip", - "application/x-zip-compressed" - ] - return mimeType in binaryTypes or mimeType.startswith("image/") or mimeType.startswith("video/") or mimeType.startswith("audio/") - async def _generateStructure( self, userPrompt: str, @@ -2100,209 +475,6 @@ Return ONLY valid JSON following the structure above. userPrompt, contentParts, outputFormat, parentOperationId ) - async def _generateStructure_OLD( - self, - userPrompt: str, - contentParts: List[ContentPart], - outputFormat: str, - parentOperationId: str - ) -> Dict[str, Any]: - """ - Phase 5C: Generiert Dokument-Struktur mit Sections. - Jede Section spezifiziert: - - Welcher Content sollte in dieser Section sein - - Welche ContentParts zu verwenden sind - - Format für jeden ContentPart - - Args: - userPrompt: User-Anfrage - contentParts: Alle vorbereiteten ContentParts mit Metadaten - outputFormat: Ziel-Format (html, docx, pdf, etc.) - parentOperationId: Parent Operation-ID für ChatLog-Hierarchie - - Returns: - Struktur-Dict mit documents und sections - """ - # Erstelle Operation-ID für Struktur-Generierung - structureOperationId = f"{parentOperationId}_structure_generation" - - # Starte ChatLog mit Parent-Referenz - self.services.chat.progressLogStart( - structureOperationId, - "Structure Generation", - "Structure", - f"Generating structure for {outputFormat}", - parentOperationId=parentOperationId - ) - - try: - # Baue Struktur-Prompt mit Content-Index - structurePrompt = self._buildStructurePrompt( - userPrompt=userPrompt, - contentParts=contentParts, - outputFormat=outputFormat - ) - - # AI-Call für Struktur-Generierung (verwende callAiPlanning für einfache JSON-Responses) - # Debug-Logs werden bereits von callAiPlanning geschrieben - aiResponse = await self.callAiPlanning( - prompt=structurePrompt, - debugType="document_generation_structure" - ) - - # Parse Struktur - structure = json.loads(self.services.utils.jsonExtractString(aiResponse)) - - # ChatLog abschließen - self.services.chat.progressLogFinish(structureOperationId, True) - - return structure - - except Exception as e: - self.services.chat.progressLogFinish(structureOperationId, False) - logger.error(f"Error in _generateStructure: {str(e)}") - raise - - def _buildStructurePrompt( - self, - userPrompt: str, - contentParts: List[ContentPart], - outputFormat: str - ) -> str: - """Baue Prompt für Struktur-Generierung.""" - # Baue ContentParts-Index - filtere leere Parts heraus - contentPartsIndex = "" - validParts = [] - filteredParts = [] - - for part in contentParts: - contentFormat = part.metadata.get("contentFormat", "unknown") - - # WICHTIG: Reference Parts haben absichtlich leere Daten - immer einschließen - if contentFormat == "reference": - validParts.append(part) - logger.debug(f"Including reference ContentPart {part.id} (intentionally empty data)") - continue - - # Überspringe leere Parts (keine Daten oder nur Container ohne Inhalt) - # ABER: Reference Parts wurden bereits oben behandelt - if not part.data or (isinstance(part.data, str) and len(part.data.strip()) == 0): - # Überspringe Container-Parts ohne Daten - if part.typeGroup == "container" and not part.data: - filteredParts.append((part.id, "container without data")) - continue - # Überspringe andere leere Parts (aber nicht Reference, die wurden bereits behandelt) - if not part.data: - filteredParts.append((part.id, f"no data (format: {contentFormat})")) - continue - - validParts.append(part) - logger.debug(f"Including ContentPart {part.id}: format={contentFormat}, type={part.typeGroup}, dataLength={len(str(part.data)) if part.data else 0}") - - if filteredParts: - logger.debug(f"Filtered out {len(filteredParts)} empty ContentParts: {filteredParts}") - - logger.info(f"Building structure prompt with {len(validParts)} valid ContentParts (from {len(contentParts)} total)") - - # Baue Index nur für gültige Parts - for i, part in enumerate(validParts, 1): - contentFormat = part.metadata.get("contentFormat", "unknown") - dataPreview = "" - - if contentFormat == "extracted": - # Für Image-Parts: Zeige dass es ein Image ist - if part.typeGroup == "image": - dataLength = len(part.data) if part.data else 0 - mimeType = part.mimeType or "image" - dataPreview = f"Image data ({mimeType}, {dataLength} chars) - base64 encoded image content" - elif part.typeGroup == "container": - # Container ohne Daten überspringen wir bereits oben - dataPreview = "Container structure (no text content)" - else: - # Zeige Preview von extrahiertem Text - if part.data: - preview = part.data[:200] + "..." if len(part.data) > 200 else part.data - dataPreview = preview - else: - dataPreview = "(empty)" - elif contentFormat == "object": - dataLength = len(part.data) if part.data else 0 - mimeType = part.mimeType or "binary" - if part.typeGroup == "image": - dataPreview = f"Base64 encoded image ({mimeType}, {dataLength} chars)" - else: - dataPreview = f"Base64 encoded binary ({mimeType}, {dataLength} chars)" - elif contentFormat == "reference": - dataPreview = part.metadata.get("documentReference", "reference") - - originalFileName = part.metadata.get('originalFileName', 'N/A') - - contentPartsIndex += f"\n{i}. ContentPart ID: {part.id}\n" - contentPartsIndex += f" Format: {contentFormat}\n" - contentPartsIndex += f" Type: {part.typeGroup}\n" - contentPartsIndex += f" MIME Type: {part.mimeType or 'N/A'}\n" - contentPartsIndex += f" Source: {part.metadata.get('documentId', 'unknown')}\n" - contentPartsIndex += f" Original file name: {originalFileName}\n" - contentPartsIndex += f" Usage hint: {part.metadata.get('usageHint', 'N/A')}\n" - contentPartsIndex += f" Data preview: {dataPreview}\n" - - if not contentPartsIndex: - contentPartsIndex = "\n(No content parts available)" - - prompt = f"""USER REQUEST: -{userPrompt} - -AVAILABLE CONTENT PARTS: -{contentPartsIndex} - -TASK: Generiere Dokument-Struktur mit Sections. -Für jede Section, spezifiziere: -- section id -- content_type (heading, paragraph, image, table, etc.) -- contentPartIds: [Liste von ContentPart-IDs zu verwenden] -- contentFormats: {{"partId": "reference|object|extracted"}} - Wie jeder ContentPart zu verwenden ist -- generation_hint: Was AI für diese Section generieren soll -- elements: [] (leer, wird in nächster Phase gefüllt) - -OUTPUT FORMAT: {outputFormat} - -RETURN JSON: -{{ - "metadata": {{ - "title": "Document Title", - "language": "de" - }}, - "documents": [{{ - "id": "doc_1", - "title": "Document Title", - "filename": "document.{outputFormat}", - "sections": [ - {{ - "id": "section_1", - "content_type": "heading", - "generation_hint": "Main title", - "contentPartIds": [], - "contentFormats": {{}}, - "elements": [] - }}, - {{ - "id": "section_2", - "content_type": "paragraph", - "generation_hint": "Introduction paragraph", - "contentPartIds": ["part_ext_1"], - "contentFormats": {{ - "part_ext_1": "extracted" - }}, - "elements": [] - }} - ] - }}] -}} - -Return ONLY valid JSON following the structure above. -""" - return prompt - async def _fillStructure( self, structure: Dict[str, Any], @@ -2315,525 +487,6 @@ Return ONLY valid JSON following the structure above. structure, contentParts, userPrompt, parentOperationId ) - async def _fillStructure_OLD( - self, - structure: Dict[str, Any], - contentParts: List[ContentPart], - userPrompt: str, - parentOperationId: str - ) -> Dict[str, Any]: - """ - Phase 5D: Füllt Struktur mit tatsächlichem Content. - Für jede Section: - - Wenn contentPartIds spezifiziert: Verwende ContentParts im spezifizierten Format - - Wenn generation_hint spezifiziert: Generiere AI-Content - - **Implementierungsdetails:** - - Sections werden **parallel generiert**, wenn möglich (Performance-Optimierung) - - Fehlerhafte Sections werden mit Fehlermeldung gerendert (kein Abbruch des gesamten Prozesses) - - Args: - structure: Struktur-Dict mit documents und sections - contentParts: Alle vorbereiteten ContentParts - userPrompt: User-Anfrage - parentOperationId: Parent Operation-ID für ChatLog-Hierarchie - - Returns: - Gefüllte Struktur mit elements in jeder Section - """ - import copy - - # Erstelle Operation-ID für Struktur-Abfüllen - fillOperationId = f"{parentOperationId}_structure_filling" - - # Starte ChatLog mit Parent-Referenz - self.services.chat.progressLogStart( - fillOperationId, - "Structure Filling", - "Filling", - f"Filling {len(structure.get('documents', [{}])[0].get('sections', []))} sections", - parentOperationId=parentOperationId - ) - - try: - filledStructure = copy.deepcopy(structure) - - # Sammle alle Sections für sequenzielle Verarbeitung (parallel kann später optimiert werden) - sections_to_process = [] - all_sections_list = [] # Für Kontext-Informationen - for doc in filledStructure.get("documents", []): - doc_sections = doc.get("sections", []) - all_sections_list.extend(doc_sections) - for section in doc_sections: - sections_to_process.append((doc, section)) - - # Sequenzielle Section-Generierung (parallel kann später hinzugefügt werden) - for sectionIndex, (doc, section) in enumerate(sections_to_process): - sectionId = section.get("id") - contentPartIds = section.get("contentPartIds", []) - contentFormats = section.get("contentFormats", {}) - generationHint = section.get("generation_hint") - contentType = section.get("content_type", "paragraph") - - elements = [] - - # Prüfe ob Aggregation nötig ist - needsAggregation = self._needsAggregation( - contentType=contentType, - contentPartCount=len(contentPartIds) - ) - - if needsAggregation and generationHint: - # Aggregation: Alle Parts zusammen verarbeiten - sectionParts = [ - self._findContentPartById(pid, contentParts) - for pid in contentPartIds - ] - sectionParts = [p for p in sectionParts if p is not None] - - if sectionParts: - # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt) - extractedParts = [ - p for p in sectionParts - if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted" - ] - nonExtractedParts = [ - p for p in sectionParts - if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted" - ] - - # Verarbeite non-extracted Parts separat (reference, object) - for part in nonExtractedParts: - contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat")) - - if contentFormat == "reference": - elements.append({ - "type": "reference", - "documentReference": part.metadata.get("documentReference"), - "label": part.metadata.get("usageHint", part.label) - }) - elif contentFormat == "object": - elements.append({ - "type": part.typeGroup, - "base64Data": part.data, - "mimeType": part.mimeType, - "altText": part.metadata.get("usageHint", part.label) - }) - - # Aggregiere extracted Parts mit AI - if extractedParts: - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=extractedParts, # ALLE PARTS für Aggregation! - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=True - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation (Aggregation)", - "Section", - f"Generating section {sectionId} with {len(extractedParts)} parts", - parentOperationId=fillOperationId - ) - - try: - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"section_content_{sectionId}_prompt" - ) - - # Verwende callAi für ContentParts-Unterstützung (nicht callAiPlanning!) - request = AiCallRequest( - prompt=generationPrompt, - contentParts=extractedParts, # ALLE PARTS! - options=AiCallOptions( - operationType=OperationTypeEnum.DATA_ANALYSE, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.callAi(request) - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"section_content_{sectionId}_response" - ) - - # Parse und füge zu elements hinzu - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # NICHT raise - Section wird mit Fehlermeldung gerendert - - else: - # Einzelverarbeitung: Jeder Part einzeln - for partId in contentPartIds: - part = self._findContentPartById(partId, contentParts) - if not part: - continue - - contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat")) - - if contentFormat == "reference": - # Füge Dokument-Referenz hinzu - elements.append({ - "type": "reference", - "documentReference": part.metadata.get("documentReference"), - "label": part.metadata.get("usageHint", part.label) - }) - - elif contentFormat == "object": - # Füge base64 Object hinzu - elements.append({ - "type": part.typeGroup, # "image", "binary", etc. - "base64Data": part.data, - "mimeType": part.mimeType, - "altText": part.metadata.get("usageHint", part.label) - }) - - elif contentFormat == "extracted": - if generationHint: - # AI-Call mit einzelnen ContentPart - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=[part], # EIN PART - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=False - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation", - "Section", - f"Generating section {sectionId}", - parentOperationId=fillOperationId - ) - - try: - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"section_content_{sectionId}_prompt" - ) - - # Verwende callAi für ContentParts-Unterstützung - request = AiCallRequest( - prompt=generationPrompt, - contentParts=[part], - options=AiCallOptions( - operationType=OperationTypeEnum.DATA_ANALYSE, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.callAi(request) - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"section_content_{sectionId}_response" - ) - - # Parse und füge zu elements hinzu - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # NICHT raise - Section wird mit Fehlermeldung gerendert - else: - # Füge extrahierten Text direkt hinzu (kein AI-Call) - elements.append({ - "type": "extracted_text", - "content": part.data, - "source": part.metadata.get("documentId"), - "extractionPrompt": part.metadata.get("extractionPrompt") - }) - - section["elements"] = elements - - # ChatLog abschließen - self.services.chat.progressLogFinish(fillOperationId, True) - - return filledStructure - - except Exception as e: - self.services.chat.progressLogFinish(fillOperationId, False) - logger.error(f"Error in _fillStructure: {str(e)}") - raise - - def _buildSectionGenerationPrompt( - self, - section: Dict[str, Any], - contentParts: List[Optional[ContentPart]], - userPrompt: str, - generationHint: str, - allSections: Optional[List[Dict[str, Any]]] = None, - sectionIndex: Optional[int] = None, - isAggregation: bool = False - ) -> str: - """Baue Prompt für Section-Generierung mit vollständigem Kontext.""" - # Filtere None-Werte - validParts = [p for p in contentParts if p is not None] - - # Section-Metadaten - sectionId = section.get("id", "unknown") - contentType = section.get("content_type", "paragraph") - - # Baue ContentParts-Beschreibung - contentPartsText = "" - if isAggregation: - # Aggregation: Zeige nur Metadaten, nicht Previews - contentPartsText += f"\n## CONTENT PARTS (Aggregation)\n" - contentPartsText += f"- Anzahl: {len(validParts)} ContentParts\n" - contentPartsText += f"- Alle ContentParts werden als Parameter übergeben (nicht im Prompt!)\n" - contentPartsText += f"- Jeder Part kann sehr groß sein → Chunking automatisch\n" - contentPartsText += f"- WICHTIG: Aggregiere ALLE Parts zu einem Element (z.B. eine Tabelle)\n\n" - contentPartsText += f"ContentPart IDs:\n" - for part in validParts: - contentFormat = part.metadata.get("contentFormat", "unknown") - contentPartsText += f" - {part.id} (Format: {contentFormat}, Type: {part.typeGroup}" - if part.metadata.get("originalFileName"): - contentPartsText += f", Source: {part.metadata.get('originalFileName')}" - contentPartsText += ")\n" - else: - # Einzelverarbeitung: Zeige Previews - for part in validParts: - contentFormat = part.metadata.get("contentFormat", "unknown") - contentPartsText += f"\n- ContentPart {part.id}:\n" - contentPartsText += f" Format: {contentFormat}\n" - contentPartsText += f" Type: {part.typeGroup}\n" - if part.metadata.get("originalFileName"): - contentPartsText += f" Source file: {part.metadata.get('originalFileName')}\n" - - if contentFormat == "extracted": - # Zeige Preview von extrahiertem Text (länger für besseren Kontext) - previewLength = 1000 - if part.data: - preview = part.data[:previewLength] + "..." if len(part.data) > previewLength else part.data - contentPartsText += f" Content preview:\n```\n{preview}\n```\n" - else: - contentPartsText += f" Content: (empty)\n" - elif contentFormat == "reference": - contentPartsText += f" Reference: {part.metadata.get('documentReference')}\n" - if part.metadata.get("usageHint"): - contentPartsText += f" Usage hint: {part.metadata.get('usageHint')}\n" - elif contentFormat == "object": - dataLength = len(part.data) if part.data else 0 - contentPartsText += f" Object type: {part.typeGroup}\n" - contentPartsText += f" MIME type: {part.mimeType}\n" - contentPartsText += f" Data size: {dataLength} chars (base64 encoded)\n" - if part.metadata.get("usageHint"): - contentPartsText += f" Usage hint: {part.metadata.get('usageHint')}\n" - - # Baue Section-Kontext (vorherige und nachfolgende Sections) - contextText = "" - if allSections and sectionIndex is not None: - prevSections = [] - nextSections = [] - - if sectionIndex > 0: - for i in range(max(0, sectionIndex - 2), sectionIndex): - prevSection = allSections[i] - prevSections.append({ - "id": prevSection.get("id"), - "content_type": prevSection.get("content_type"), - "generation_hint": prevSection.get("generation_hint", "")[:100] - }) - - if sectionIndex < len(allSections) - 1: - for i in range(sectionIndex + 1, min(len(allSections), sectionIndex + 3)): - nextSection = allSections[i] - nextSections.append({ - "id": nextSection.get("id"), - "content_type": nextSection.get("content_type"), - "generation_hint": nextSection.get("generation_hint", "")[:100] - }) - - if prevSections or nextSections: - contextText = "\n## DOCUMENT CONTEXT\n" - if prevSections: - contextText += "\nPrevious sections:\n" - for prev in prevSections: - contextText += f"- {prev['id']} ({prev['content_type']}): {prev['generation_hint']}\n" - if nextSections: - contextText += "\nFollowing sections:\n" - for next in nextSections: - contextText += f"- {next['id']} ({next['content_type']}): {next['generation_hint']}\n" - - if isAggregation: - prompt = f"""# TASK: Generate Section Content (Aggregation) - -## SECTION METADATA -- Section ID: {sectionId} -- Content Type: {contentType} -- Generation Hint: {generationHint} -{contextText} - -## USER REQUEST (for context) -``` -{userPrompt} -``` - -## AVAILABLE CONTENT FOR THIS SECTION -{contentPartsText if contentPartsText else "(No content parts specified for this section)"} - -## INSTRUCTIONS -1. Generate content for section "{sectionId}" based on the generation hint above -2. **AGGREGATION**: Combine ALL provided ContentParts into ONE element (e.g., one table with all data) -3. For table content_type: Create a single table with headers and rows from all ContentParts -4. For bullet_list content_type: Create a single list with items from all ContentParts -5. Format appropriately based on content_type ({contentType}) -6. Ensure the generated content fits logically between previous and following sections -7. Return ONLY a JSON object with an "elements" array -8. Each element should match the content_type: {contentType} - -## OUTPUT FORMAT -Return a JSON object with this structure: -```json -{{ - "elements": [ - {{ - "type": "{contentType}", - "headers": [...], // if table - "rows": [...], // if table - "items": [...], // if bullet_list - "content": "..." // if paragraph - }} - ] -}} -``` - -CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside the JSON. -""" - else: - prompt = f"""# TASK: Generate Section Content - -## SECTION METADATA -- Section ID: {sectionId} -- Content Type: {contentType} -- Generation Hint: {generationHint} -{contextText} - -## USER REQUEST (for context) -``` -{userPrompt} -``` - -## AVAILABLE CONTENT FOR THIS SECTION -{contentPartsText if contentPartsText else "(No content parts specified for this section)"} - -## INSTRUCTIONS -1. Generate content for section "{sectionId}" based on the generation hint above -2. Use the available content parts to populate this section -3. For images: Use data URI format (data:image/[type];base64,[data]) when embedding base64 image data -4. For extracted text: Format appropriately based on content_type ({contentType}) -5. Ensure the generated content fits logically between previous and following sections -6. Return ONLY a JSON object with an "elements" array -7. Each element should match the content_type: {contentType} - -## OUTPUT FORMAT -Return a JSON object with this structure: -```json -{{ - "elements": [ - {{ - "type": "{contentType}", - "content": "..." - }} - ] -}} -``` - -CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside the JSON. -""" - return prompt - - def _findContentPartById(self, partId: str, contentParts: List[ContentPart]) -> Optional[ContentPart]: - """Finde ContentPart nach ID.""" - for part in contentParts: - if part.id == partId: - return part - return None - - def _needsAggregation( - self, - contentType: str, - contentPartCount: int - ) -> bool: - """ - Bestimmt ob mehrere ContentParts aggregiert werden müssen. - - Aggregation nötig wenn: - - content_type erfordert Aggregation (table, bullet_list) - - UND mehrere ContentParts vorhanden sind (> 1) - - Args: - contentType: Section content_type - contentPartCount: Anzahl der ContentParts in dieser Section - - Returns: - True wenn Aggregation nötig, False sonst - """ - aggregationTypes = ["table", "bullet_list"] - - if contentType in aggregationTypes and contentPartCount > 1: - return True - - # Optional: Auch für paragraph wenn mehrere Parts vorhanden - # (z.B. Vergleich mehrerer Dokumente) - # Standard: Keine Aggregation für paragraph - return False - async def _renderResult( self, filledStructure: Dict[str, Any], diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py new file mode 100644 index 00000000..8ebafd23 --- /dev/null +++ b/modules/services/serviceAi/subAiCallLooping.py @@ -0,0 +1,533 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +AI Call Looping Module + +Handles AI calls with looping and repair logic, including: +- Looping with JSON repair and continuation +- KPI definition and tracking +- Progress tracking and iteration management +""" +import json +import logging +from typing import Dict, Any, List, Optional, Callable + +from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState +from modules.datamodels.datamodelExtraction import ContentPart +from modules.shared.jsonUtils import buildContinuationContext, extractJsonString +from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler + +logger = logging.getLogger(__name__) + + +class AiCallLooper: + """Handles AI calls with looping and repair logic.""" + + def __init__(self, services, aiService, responseParser): + """Initialize AiCallLooper with service center, AI service, and response parser access.""" + self.services = services + self.aiService = aiService + self.responseParser = responseParser + + async def callAiWithLooping( + self, + prompt: str, + options: AiCallOptions, + debugPrefix: str = "ai_call", + promptBuilder: Optional[Callable] = None, + promptArgs: Optional[Dict[str, Any]] = None, + operationId: Optional[str] = None, + userPrompt: Optional[str] = None, + contentParts: Optional[List[ContentPart]] = None # ARCHITECTURE: Support ContentParts for large content + ) -> str: + """ + Shared core function for AI calls with repair-based looping system. + Automatically repairs broken JSON and continues generation seamlessly. + + Args: + prompt: The prompt to send to AI + options: AI call configuration options + debugPrefix: Prefix for debug file names + promptBuilder: Optional function to rebuild prompts for continuation + promptArgs: Optional arguments for prompt builder + operationId: Optional operation ID for progress tracking + userPrompt: Optional user prompt for KPI definition + contentParts: Optional content parts for first iteration + + Returns: + Complete AI response after all iterations + """ + maxIterations = 50 # Prevent infinite loops + iteration = 0 + allSections = [] # Accumulate all sections across iterations + lastRawResponse = None # Store last raw JSON response for continuation + documentMetadata = None # Store document metadata (title, filename) from first iteration + accumulationState = None # Track accumulation state for string accumulation + + # Get parent operation ID for iteration operations (parentId should be operationId, not log entry ID) + parentOperationId = operationId # Use the parent's operationId directly + + while iteration < maxIterations: + iteration += 1 + + # Create separate operation for each iteration with parent reference + iterationOperationId = None + if operationId: + iterationOperationId = f"{operationId}_iter_{iteration}" + self.services.chat.progressLogStart( + iterationOperationId, + "AI Call", + f"Iteration {iteration}", + "", + parentOperationId=parentOperationId + ) + + # Build iteration prompt + # CRITICAL: Build continuation prompt if we have sections OR if we have a previous response (even if broken) + # This ensures continuation prompts are built even when JSON is so broken that no sections can be extracted + if (len(allSections) > 0 or lastRawResponse) and promptBuilder and promptArgs: + # This is a continuation - build continuation context with raw JSON and rebuild prompt + continuationContext = buildContinuationContext(allSections, lastRawResponse) + if not lastRawResponse: + logger.warning(f"Iteration {iteration}: No previous response available for continuation!") + + # Filter promptArgs to only include parameters that buildGenerationPrompt accepts + # buildGenerationPrompt accepts: outputFormat, userPrompt, title, extracted_content, continuationContext, services + filteredPromptArgs = { + k: v for k, v in promptArgs.items() + if k in ['outputFormat', 'userPrompt', 'title', 'extracted_content', 'services'] + } + # Always include services if available + if not filteredPromptArgs.get('services') and hasattr(self, 'services'): + filteredPromptArgs['services'] = self.services + + # Rebuild prompt with continuation context using the provided prompt builder + iterationPrompt = await promptBuilder(**filteredPromptArgs, continuationContext=continuationContext) + else: + # First iteration - use original prompt + iterationPrompt = prompt + + # Make AI call + try: + if iterationOperationId: + self.services.chat.progressLogUpdate(iterationOperationId, 0.3, "Calling AI model") + # ARCHITECTURE: Pass ContentParts directly to AiCallRequest + # This allows model-aware chunking to handle large content properly + # ContentParts are only passed in first iteration (continuations don't need them) + request = AiCallRequest( + prompt=iterationPrompt, + context="", + options=options, + contentParts=contentParts if iteration == 1 else None # Only pass ContentParts in first iteration + ) + + # Write the ACTUAL prompt sent to AI + if iteration == 1: + self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt") + else: + self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") + + response = await self.aiService.callAi(request) + result = response.content + + # Track bytes for progress reporting + bytesReceived = len(result.encode('utf-8')) if result else 0 + totalBytesSoFar = sum(len(section.get('content', '').encode('utf-8')) if isinstance(section.get('content'), str) else 0 for section in allSections) + bytesReceived + + # Update progress after AI call with byte information + if iterationOperationId: + # Format bytes for display (kB or MB) + if totalBytesSoFar < 1024: + bytesDisplay = f"{totalBytesSoFar}B" + elif totalBytesSoFar < 1024 * 1024: + bytesDisplay = f"{totalBytesSoFar / 1024:.1f}kB" + else: + bytesDisplay = f"{totalBytesSoFar / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})") + + # Write raw AI response to debug file + if iteration == 1: + self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") + else: + self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") + + # Emit stats for this iteration (only if workflow exists and has id) + if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id: + try: + self.services.chat.storeWorkflowStat( + self.services.workflow, + response, + f"ai.call.{debugPrefix}.iteration_{iteration}" + ) + except Exception as statError: + # Don't break the main loop if stat storage fails + logger.warning(f"Failed to store workflow stat: {str(statError)}") + + # Check for error response using generic error detection (errorCount > 0 or modelName == "error") + if hasattr(response, 'errorCount') and response.errorCount > 0: + errorMsg = f"Iteration {iteration}: Error response detected (errorCount={response.errorCount}), stopping loop: {result[:200] if result else 'empty'}" + logger.error(errorMsg) + break + + if hasattr(response, 'modelName') and response.modelName == "error": + errorMsg = f"Iteration {iteration}: Error response detected (modelName=error), stopping loop: {result[:200] if result else 'empty'}" + logger.error(errorMsg) + break + + if not result or not result.strip(): + logger.warning(f"Iteration {iteration}: Empty response, stopping") + break + + # Check if this is a text response (not document generation) + # Text responses don't need JSON parsing - return immediately after first successful response + isTextResponse = (promptBuilder is None and promptArgs is None) or debugPrefix == "text" + + if isTextResponse: + # For text responses, return the text immediately - no JSON parsing needed + logger.info(f"Iteration {iteration}: Text response received, returning immediately") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) + return result + + # Store raw response for continuation (even if broken) + lastRawResponse = result + + # Extract sections from response (handles both valid and broken JSON) + # Only for document generation (JSON responses) + # CRITICAL: Pass allSections and accumulationState to enable string accumulation + extractedSections, wasJsonComplete, parsedResult, accumulationState = self.responseParser.extractSectionsFromResponse( + result, iteration, debugPrefix, allSections, accumulationState + ) + + # CRITICAL: Merge sections BEFORE KPI validation + # This ensures sections are preserved even if KPI validation fails + if extractedSections: + allSections = JsonResponseHandler.mergeSectionsIntelligently(allSections, extractedSections, iteration) + + # Define KPIs if we just entered accumulation mode (iteration 1, incomplete JSON) + if accumulationState and accumulationState.isAccumulationMode and iteration == 1 and not accumulationState.kpis: + logger.info(f"Iteration {iteration}: Defining KPIs for accumulation tracking") + continuationContext = buildContinuationContext(allSections, result) + # Pass raw response string from first iteration for KPI definition + kpiDefinitions = await self._defineKpisFromPrompt( + userPrompt or prompt, + result, # Pass raw JSON string from first iteration + continuationContext, + debugPrefix + ) + # Initialize KPIs with currentValue = 0 + accumulationState.kpis = [{**kpi, "currentValue": 0} for kpi in kpiDefinitions] + logger.info(f"Defined {len(accumulationState.kpis)} KPIs: {[kpi.get('id') for kpi in accumulationState.kpis]}") + + # Extract and validate KPIs (if in accumulation mode with KPIs defined) + if accumulationState and accumulationState.isAccumulationMode and accumulationState.kpis: + # For KPI extraction, prefer accumulated JSON string over repaired JSON + # because repairBrokenJson may lose data (e.g., empty rows array when JSON is incomplete) + updatedKpis = [] + + # First try to extract from parsedResult (repaired JSON) + if parsedResult: + try: + updatedKpis = JsonResponseHandler.extractKpiValuesFromJson( + parsedResult, + accumulationState.kpis + ) + # Check if we got meaningful values (non-zero) + hasValidValues = any(kpi.get("currentValue", 0) > 0 for kpi in updatedKpis) + if not hasValidValues and accumulationState.accumulatedJsonString: + # Repaired JSON has empty values, try accumulated string + logger.debug("Repaired JSON has empty KPI values, trying accumulated JSON string") + updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson( + accumulationState.accumulatedJsonString, + accumulationState.kpis + ) + except Exception as e: + logger.debug(f"Error extracting KPIs from parsedResult: {e}") + updatedKpis = [] + + # If no parsedResult or extraction failed, try accumulated string + if not updatedKpis and accumulationState.accumulatedJsonString: + try: + updatedKpis = JsonResponseHandler.extractKpiValuesFromIncompleteJson( + accumulationState.accumulatedJsonString, + accumulationState.kpis + ) + except Exception as e: + logger.debug(f"Error extracting KPIs from accumulated JSON string: {e}") + updatedKpis = [] + + if updatedKpis: + shouldProceed, reason = JsonResponseHandler.validateKpiProgression( + accumulationState, + updatedKpis + ) + + if not shouldProceed: + logger.warning(f"Iteration {iteration}: KPI validation failed: {reason}") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, False) + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.9, f"KPI validation failed: {reason} ({iteration} iterations)") + break + + # Update KPIs in accumulation state + accumulationState.kpis = updatedKpis + logger.info(f"Iteration {iteration}: KPIs updated: {[(kpi.get('id'), kpi.get('currentValue')) for kpi in updatedKpis]}") + + # Check if all KPIs completed + allCompleted = True + for kpi in updatedKpis: + targetValue = kpi.get("targetValue", 0) + currentValue = kpi.get("currentValue", 0) + if currentValue < targetValue: + allCompleted = False + break + + if allCompleted: + logger.info(f"Iteration {iteration}: All KPIs completed, finishing accumulation") + wasJsonComplete = True # Mark as complete to exit loop + + # CRITICAL: Handle JSON fragments (continuation content) + # Fragment merging happens inside extractSectionsFromResponse + # If merge fails (returns wasJsonComplete=True), stop iterations and complete JSON + if not extractedSections and allSections: + if wasJsonComplete: + # Merge failed - stop iterations, complete JSON with available data + logger.error(f"Iteration {iteration}: ❌ MERGE FAILED - Stopping iterations, completing JSON with available data") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, False) + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.9, f"Merge failed, completing JSON ({iteration} iterations)") + break + + # Fragment was detected and merged successfully + logger.info(f"Iteration {iteration}: JSON fragment detected and merged, continuing") + # Don't break - fragment was merged, continue to get more content if needed + # Check if we should continue based on JSON completeness + shouldContinue = self.responseParser.shouldContinueGeneration( + allSections, + iteration, + wasJsonComplete, + result + ) + if shouldContinue: + if iterationOperationId: + self.services.chat.progressLogUpdate(iterationOperationId, 0.8, "Fragment merged, continuing") + self.services.chat.progressLogFinish(iterationOperationId, True) + continue + else: + # Done - fragment was merged and JSON is complete + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) + if operationId: + self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete ({iteration} iterations, fragment merged)") + logger.info(f"Generation complete after {iteration} iterations: fragment merged") + break + + # Extract document metadata from first iteration if available + if iteration == 1 and parsedResult and not documentMetadata: + documentMetadata = self.responseParser.extractDocumentMetadata(parsedResult) + + # Update progress after parsing + if iterationOperationId: + if extractedSections: + self.services.chat.progressLogUpdate(iterationOperationId, 0.8, f"Extracted {len(extractedSections)} sections") + + if not extractedSections: + # CRITICAL: If JSON was incomplete/broken, continue even if no sections extracted + # This allows the AI to retry and complete the broken JSON + if not wasJsonComplete: + logger.warning(f"Iteration {iteration}: No sections extracted from broken JSON, continuing for another attempt") + continue + # If JSON was complete but no sections extracted - check if it was a fragment + # Fragments are handled above, so if we get here and it's complete, it's an error + logger.warning(f"Iteration {iteration}: No sections extracted from complete JSON, stopping") + break + + # NOTE: Section merging now happens BEFORE KPI validation (see above) + # This ensures sections are preserved even if KPI validation fails + + # Calculate total bytes in merged content for progress display + merged_json_str = json.dumps(allSections, indent=2, ensure_ascii=False) + totalBytesGenerated = len(merged_json_str.encode('utf-8')) + + # Update main operation with byte progress + if operationId: + # Format bytes for display + if totalBytesGenerated < 1024: + bytesDisplay = f"{totalBytesGenerated}B" + elif totalBytesGenerated < 1024 * 1024: + bytesDisplay = f"{totalBytesGenerated / 1024:.1f}kB" + else: + bytesDisplay = f"{totalBytesGenerated / (1024 * 1024):.1f}MB" + # Estimate progress based on iterations (rough estimate) + estimatedProgress = min(0.9, 0.4 + (iteration * 0.1)) + self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})") + + # Log merged sections for debugging + self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") + + # Check if we should continue (completion detection) + # Simple logic: JSON completeness determines continuation + shouldContinue = self.responseParser.shouldContinueGeneration( + allSections, + iteration, + wasJsonComplete, + result + ) + + if shouldContinue: + # Finish iteration operation (will continue with next iteration) + if iterationOperationId: + # Show byte progress in iteration completion + iterBytes = len(result.encode('utf-8')) if result else 0 + if iterBytes < 1024: + iterBytesDisplay = f"{iterBytes}B" + elif iterBytes < 1024 * 1024: + iterBytesDisplay = f"{iterBytes / 1024:.1f}kB" + else: + iterBytesDisplay = f"{iterBytes / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Completed ({iterBytesDisplay})") + self.services.chat.progressLogFinish(iterationOperationId, True) + continue + else: + # Done - finish iteration and update main operation + if iterationOperationId: + # Show final byte count + finalBytes = len(merged_json_str.encode('utf-8')) + if finalBytes < 1024: + finalBytesDisplay = f"{finalBytes}B" + elif finalBytes < 1024 * 1024: + finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" + else: + finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(iterationOperationId, 0.95, f"Complete ({finalBytesDisplay})") + self.services.chat.progressLogFinish(iterationOperationId, True) + if operationId: + # Show final size in main operation + finalBytes = len(merged_json_str.encode('utf-8')) + if finalBytes < 1024: + finalBytesDisplay = f"{finalBytes}B" + elif finalBytes < 1024 * 1024: + finalBytesDisplay = f"{finalBytes / 1024:.1f}kB" + else: + finalBytesDisplay = f"{finalBytes / (1024 * 1024):.1f}MB" + self.services.chat.progressLogUpdate(operationId, 0.95, f"Generation complete: {finalBytesDisplay} ({iteration} iterations, {len(allSections)} sections)") + logger.info(f"Generation complete after {iteration} iterations: {len(allSections)} sections") + break + + except Exception as e: + logger.error(f"Error in AI call iteration {iteration}: {str(e)}") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, False) + break + + if iteration >= maxIterations: + logger.warning(f"AI call stopped after maximum iterations ({maxIterations})") + + # CRITICAL: Complete any incomplete structures in sections before building final result + # This ensures JSON is properly closed even if merge failed or iterations stopped early + allSections = JsonResponseHandler.completeIncompleteStructures(allSections) + + # Build final result from accumulated sections + final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata) + + # Write final result to debug file + self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") + + return final_result + + async def _defineKpisFromPrompt( + self, + userPrompt: str, + rawJsonString: Optional[str], + continuationContext: Dict[str, Any], + debugPrefix: str = "kpi" + ) -> List[Dict[str, Any]]: + """ + Make separate AI call to define KPIs based on user prompt and incomplete JSON. + + Args: + userPrompt: Original user prompt + rawJsonString: Raw JSON string from first iteration response + continuationContext: Continuation context (not used for JSON, kept for compatibility) + debugPrefix: Prefix for debug file names + + Returns: + List of KPI definitions: [{"id": str, "description": str, "jsonPath": str, "targetValue": int}, ...] + """ + # Use raw JSON string from first iteration response + if rawJsonString: + # Remove markdown code fences if present + from modules.shared.jsonUtils import stripCodeFences + incompleteJson = stripCodeFences(rawJsonString.strip()) + else: + incompleteJson = "Not available" + + kpiDefinitionPrompt = f"""Analyze the user request and incomplete JSON to define KPIs (Key Performance Indicators) for tracking progress. + +User Request: +{userPrompt} + +Delivered JSON part: +{incompleteJson} + +Task: Define which JSON items should be tracked to measure completion progress. + +IMPORTANT: Analyze the Delivered JSON part structure to understand what is being tracked: +1. Identify the structure type (table with rows, list with items, etc.) +2. Determine what the jsonPath actually counts (number of rows, number of items, etc.) +3. Calculate targetValue based on what is being tracked, NOT the total quantity requested + +For each trackable item, provide: +- id: Unique identifier (use descriptive name) +- description: What this KPI measures (be specific about what is counted) +- jsonPath: Path to extract value from JSON (use dot notation with array indices, e.g., "documents[0].sections[1].elements[0].rows") +- targetValue: Target value to reach (integer) - MUST match what jsonPath actually tracks (rows count, items count, etc.) + +Return ONLY valid JSON in this format: +{{ + "kpis": [ + {{ + "id": "unique_id", + "description": "Description of what is measured", + "jsonPath": "path.to.value", + "targetValue": 0 + }} + ] +}} + +If no trackable items can be identified, return: {{"kpis": []}} +""" + + try: + request = AiCallRequest( + prompt=kpiDefinitionPrompt, + options=AiCallOptions( + operationType=OperationTypeEnum.DATA_ANALYSE, + priority=PriorityEnum.SPEED, + processingMode=ProcessingModeEnum.BASIC + ) + ) + + # Write KPI definition prompt to debug file + self.services.utils.writeDebugFile(kpiDefinitionPrompt, f"{debugPrefix}_kpi_definition_prompt") + + response = await self.aiService.callAi(request) + + # Write KPI definition response to debug file + self.services.utils.writeDebugFile(response.content, f"{debugPrefix}_kpi_definition_response") + + # Parse response + extracted = extractJsonString(response.content) + kpiResponse = json.loads(extracted) + + kpiDefinitions = kpiResponse.get("kpis", []) + logger.info(f"Defined {len(kpiDefinitions)} KPIs for tracking") + + return kpiDefinitions + + except Exception as e: + logger.warning(f"Failed to define KPIs: {e}, continuing without KPI tracking") + return [] +