# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Generic Looping Use Case System Provides parametrized looping infrastructure supporting different JSON formats and use cases. """ import logging from dataclasses import dataclass, field from typing import Dict, Any, List, Optional, Callable logger = logging.getLogger(__name__) # Callback functions for use-case-specific logic def _handleSectionContentFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str, debugPrefix: str, services: Any) -> str: """Handle final result for section_content: return raw result to preserve all JSON blocks.""" final_json = result # Return raw response to preserve all JSON blocks # Write final merged result for section_content (overwrites iteration 1 response with complete merged result) if services and hasattr(services, 'utils') and hasattr(services.utils, 'writeDebugFile'): services.utils.writeDebugFile(final_json, f"{debugPrefix}_response") return final_json def _handleChapterStructureFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str, debugPrefix: str, services: Any) -> str: """Handle final result for chapter_structure: format JSON and write debug file.""" import json final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result) # Write final result for chapter structure if services and hasattr(services, 'utils') and hasattr(services.utils, 'writeDebugFile'): services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result") return final_json def _handleCodeStructureFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str, debugPrefix: str, services: Any) -> str: """Handle final result for code_structure: format JSON and write debug file.""" import json final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result) # Write final result for code structure if services and hasattr(services, 'utils') and hasattr(services.utils, 'writeDebugFile'): services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result") return final_json def _handleCodeContentFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str, debugPrefix: str, services: Any) -> str: """Handle final result for code_content: format JSON.""" import json final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result) return final_json def _normalizeSectionContentJson(parsed: Any, useCaseId: str) -> Any: """Normalize JSON structure for section_content use case.""" # For section_content, expect {"elements": [...]} structure if isinstance(parsed, list): # Check if list contains strings (invalid format) or element objects if parsed and isinstance(parsed[0], str): # Invalid format - list of strings instead of elements # Try to convert strings to paragraph elements as fallback logger.debug(f"Received list of strings instead of elements array, converting to paragraph elements") elements = [] for text in parsed: if isinstance(text, str) and text.strip(): elements.append({ "type": "paragraph", "content": { "text": text.strip() } }) return {"elements": elements} if elements else {"elements": []} else: # Convert plain list of elements to elements structure return {"elements": parsed} elif isinstance(parsed, dict): # If it already has "elements", return as-is if "elements" in parsed: return parsed # If it has "type" and looks like an element, wrap in elements array elif parsed.get("type"): return {"elements": [parsed]} # Otherwise, assume it's already in correct format else: return parsed # For other use cases, return as-is (they have their own structures) return parsed def _normalizeDefaultJson(parsed: Any, useCaseId: str) -> Any: """Default normalizer: return as-is.""" return parsed @dataclass class LoopingUseCase: """Configuration for a specific looping use case.""" # Identification useCaseId: str # "section_content", "chapter_structure", "code_structure", "code_content" # JSON Format Detection jsonTemplate: Dict[str, Any] # Expected JSON structure template detectionKeys: List[str] # Keys to check for format detection (e.g., ["elements"], ["chapters"], ["files"]) detectionPath: str # JSONPath to check (e.g., "documents[0].chapters", "files[0].content") # Prompt Building initialPromptBuilder: Optional[Callable] = None # Function to build initial prompt continuationPromptBuilder: Optional[Callable] = None # Function to build continuation prompt # Accumulation & Merging accumulator: Optional[Callable] = None # Function to accumulate fragments merger: Optional[Callable] = None # Function to merge accumulated data # Continuation Context continuationContextBuilder: Optional[Callable] = None # Build continuation context for this format # Result Building resultBuilder: Optional[Callable] = None # Build final result from accumulated data # Use-case-specific handlers (callbacks to avoid if/elif chains in generic code) finalResultHandler: Optional[Callable] = None # Handle final result formatting and debug file writing jsonNormalizer: Optional[Callable] = None # Normalize JSON structure for this use case # Metadata supportsAccumulation: bool = True # Whether this use case supports accumulation requiresExtraction: bool = False # Whether this requires extraction (like sections) class LoopingUseCaseRegistry: """Registry of all looping use cases.""" def __init__(self): self.useCases: Dict[str, LoopingUseCase] = {} self._registerDefaultUseCases() def register(self, useCase: LoopingUseCase): """Register a new use case.""" self.useCases[useCase.useCaseId] = useCase logger.debug(f"Registered looping use case: {useCase.useCaseId}") def get(self, useCaseId: str) -> Optional[LoopingUseCase]: """Get use case by ID.""" return self.useCases.get(useCaseId) def detectUseCase(self, parsedJson: Dict[str, Any]) -> Optional[str]: """Detect which use case matches the JSON structure.""" for useCaseId, useCase in self.useCases.items(): if self._matchesFormat(parsedJson, useCase): return useCaseId return None def _matchesFormat(self, json: Dict[str, Any], useCase: LoopingUseCase) -> bool: """Check if JSON matches use case format.""" # Check top-level keys for key in useCase.detectionKeys: if key in json: return True # Check nested path using simple dictionary traversal (no jsonpath_ng needed) if useCase.detectionPath: try: # Simple path matching without jsonpath_ng # Format: "documents[0].chapters" or "files[0].content" pathParts = useCase.detectionPath.split(".") current = json for part in pathParts: # Handle array indices like "documents[0]" if "[" in part and "]" in part: key = part.split("[")[0] index = int(part.split("[")[1].split("]")[0]) if isinstance(current, dict) and key in current: if isinstance(current[key], list) and 0 <= index < len(current[key]): current = current[key][index] else: return False else: return False else: # Regular key access if isinstance(current, dict) and part in current: current = current[part] else: return False # If we successfully traversed the path, it matches return True except Exception as e: logger.debug(f"Path matching failed for {useCase.useCaseId}: {e}") return False def _registerDefaultUseCases(self): """Register default use cases.""" # Use Case 1: Section Content Generation # Returns JSON with "elements" array directly self.register(LoopingUseCase( useCaseId="section_content", jsonTemplate={"elements": []}, detectionKeys=["elements"], detectionPath="", initialPromptBuilder=None, # Will use default prompt builder continuationPromptBuilder=None, # Will use default continuation builder accumulator=None, # Direct return, no accumulation merger=None, continuationContextBuilder=None, # Will use default continuation context resultBuilder=None, # Return JSON directly finalResultHandler=_handleSectionContentFinalResult, jsonNormalizer=_normalizeSectionContentJson, supportsAccumulation=False, requiresExtraction=False )) # Use Case 2: Chapter Structure Generation # Returns JSON with "documents[0].chapters" structure self.register(LoopingUseCase( useCaseId="chapter_structure", jsonTemplate={"documents": [{"chapters": []}]}, detectionKeys=["chapters"], detectionPath="documents[0].chapters", initialPromptBuilder=None, continuationPromptBuilder=None, accumulator=None, # Direct return, no accumulation merger=None, continuationContextBuilder=None, resultBuilder=None, # Return JSON directly finalResultHandler=_handleChapterStructureFinalResult, jsonNormalizer=_normalizeDefaultJson, supportsAccumulation=False, requiresExtraction=False )) # Use Case 3: Code Structure Generation self.register(LoopingUseCase( useCaseId="code_structure", jsonTemplate={ "metadata": { "language": "", "projectType": "single_file|multi_file", "projectName": "" }, "files": [ { "id": "", "filename": "", "fileType": "", "dependencies": [], "imports": [], "functions": [], "classes": [] } ] }, detectionKeys=["files"], detectionPath="files", initialPromptBuilder=None, continuationPromptBuilder=None, accumulator=None, # Direct return merger=None, continuationContextBuilder=None, resultBuilder=None, finalResultHandler=_handleCodeStructureFinalResult, jsonNormalizer=_normalizeDefaultJson, supportsAccumulation=False, requiresExtraction=False )) # Use Case 5: Code Content Generation (NEW) self.register(LoopingUseCase( useCaseId="code_content", jsonTemplate={"files": [{"content": "", "functions": []}]}, detectionKeys=["content", "functions"], detectionPath="files[0].content", initialPromptBuilder=None, continuationPromptBuilder=None, accumulator=None, # Will use default accumulator merger=None, # Will use default merger continuationContextBuilder=None, resultBuilder=None, # Will use default result builder finalResultHandler=_handleCodeContentFinalResult, jsonNormalizer=_normalizeDefaultJson, supportsAccumulation=True, requiresExtraction=False )) logger.info(f"Registered {len(self.useCases)} default looping use cases")