gateway/modules/features/aichat/serviceAi/subLoopingUseCases.py
2026-01-22 17:00:29 +01:00

293 lines
13 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Generic Looping Use Case System
Provides parametrized looping infrastructure supporting different JSON formats and use cases.
"""
import logging
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional, Callable
logger = logging.getLogger(__name__)
# Callback functions for use-case-specific logic
def _handleSectionContentFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str,
debugPrefix: str, services: Any) -> str:
"""Handle final result for section_content: return raw result to preserve all JSON blocks."""
final_json = result # Return raw response to preserve all JSON blocks
# Write final merged result for section_content (overwrites iteration 1 response with complete merged result)
if services and hasattr(services, 'utils') and hasattr(services.utils, 'writeDebugFile'):
services.utils.writeDebugFile(final_json, f"{debugPrefix}_response")
return final_json
def _handleChapterStructureFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str,
debugPrefix: str, services: Any) -> str:
"""Handle final result for chapter_structure: format JSON and write debug file."""
import json
final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result)
# Write final result for chapter structure
if services and hasattr(services, 'utils') and hasattr(services.utils, 'writeDebugFile'):
services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
return final_json
def _handleCodeStructureFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str,
debugPrefix: str, services: Any) -> str:
"""Handle final result for code_structure: format JSON and write debug file."""
import json
final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result)
# Write final result for code structure
if services and hasattr(services, 'utils') and hasattr(services.utils, 'writeDebugFile'):
services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
return final_json
def _handleCodeContentFinalResult(result: str, parsedJsonForUseCase: Any, extractedJsonForUseCase: str,
debugPrefix: str, services: Any) -> str:
"""Handle final result for code_content: format JSON."""
import json
final_json = json.dumps(parsedJsonForUseCase, indent=2, ensure_ascii=False) if parsedJsonForUseCase else (extractedJsonForUseCase or result)
return final_json
def _normalizeSectionContentJson(parsed: Any, useCaseId: str) -> Any:
"""Normalize JSON structure for section_content use case."""
# For section_content, expect {"elements": [...]} structure
if isinstance(parsed, list):
# Check if list contains strings (invalid format) or element objects
if parsed and isinstance(parsed[0], str):
# Invalid format - list of strings instead of elements
# Try to convert strings to paragraph elements as fallback
logger.debug(f"Received list of strings instead of elements array, converting to paragraph elements")
elements = []
for text in parsed:
if isinstance(text, str) and text.strip():
elements.append({
"type": "paragraph",
"content": {
"text": text.strip()
}
})
return {"elements": elements} if elements else {"elements": []}
else:
# Convert plain list of elements to elements structure
return {"elements": parsed}
elif isinstance(parsed, dict):
# If it already has "elements", return as-is
if "elements" in parsed:
return parsed
# If it has "type" and looks like an element, wrap in elements array
elif parsed.get("type"):
return {"elements": [parsed]}
# Otherwise, assume it's already in correct format
else:
return parsed
# For other use cases, return as-is (they have their own structures)
return parsed
def _normalizeDefaultJson(parsed: Any, useCaseId: str) -> Any:
"""Default normalizer: return as-is."""
return parsed
@dataclass
class LoopingUseCase:
"""Configuration for a specific looping use case."""
# Identification
useCaseId: str # "section_content", "chapter_structure", "code_structure", "code_content"
# JSON Format Detection
jsonTemplate: Dict[str, Any] # Expected JSON structure template
detectionKeys: List[str] # Keys to check for format detection (e.g., ["elements"], ["chapters"], ["files"])
detectionPath: str # JSONPath to check (e.g., "documents[0].chapters", "files[0].content")
# Prompt Building
initialPromptBuilder: Optional[Callable] = None # Function to build initial prompt
continuationPromptBuilder: Optional[Callable] = None # Function to build continuation prompt
# Accumulation & Merging
accumulator: Optional[Callable] = None # Function to accumulate fragments
merger: Optional[Callable] = None # Function to merge accumulated data
# Continuation Context
continuationContextBuilder: Optional[Callable] = None # Build continuation context for this format
# Result Building
resultBuilder: Optional[Callable] = None # Build final result from accumulated data
# Use-case-specific handlers (callbacks to avoid if/elif chains in generic code)
finalResultHandler: Optional[Callable] = None # Handle final result formatting and debug file writing
jsonNormalizer: Optional[Callable] = None # Normalize JSON structure for this use case
# Metadata
supportsAccumulation: bool = True # Whether this use case supports accumulation
requiresExtraction: bool = False # Whether this requires extraction (like sections)
class LoopingUseCaseRegistry:
"""Registry of all looping use cases."""
def __init__(self):
self.useCases: Dict[str, LoopingUseCase] = {}
self._registerDefaultUseCases()
def register(self, useCase: LoopingUseCase):
"""Register a new use case."""
self.useCases[useCase.useCaseId] = useCase
logger.debug(f"Registered looping use case: {useCase.useCaseId}")
def get(self, useCaseId: str) -> Optional[LoopingUseCase]:
"""Get use case by ID."""
return self.useCases.get(useCaseId)
def detectUseCase(self, parsedJson: Dict[str, Any]) -> Optional[str]:
"""Detect which use case matches the JSON structure."""
for useCaseId, useCase in self.useCases.items():
if self._matchesFormat(parsedJson, useCase):
return useCaseId
return None
def _matchesFormat(self, json: Dict[str, Any], useCase: LoopingUseCase) -> bool:
"""Check if JSON matches use case format."""
# Check top-level keys
for key in useCase.detectionKeys:
if key in json:
return True
# Check nested path using simple dictionary traversal (no jsonpath_ng needed)
if useCase.detectionPath:
try:
# Simple path matching without jsonpath_ng
# Format: "documents[0].chapters" or "files[0].content"
pathParts = useCase.detectionPath.split(".")
current = json
for part in pathParts:
# Handle array indices like "documents[0]"
if "[" in part and "]" in part:
key = part.split("[")[0]
index = int(part.split("[")[1].split("]")[0])
if isinstance(current, dict) and key in current:
if isinstance(current[key], list) and 0 <= index < len(current[key]):
current = current[key][index]
else:
return False
else:
return False
else:
# Regular key access
if isinstance(current, dict) and part in current:
current = current[part]
else:
return False
# If we successfully traversed the path, it matches
return True
except Exception as e:
logger.debug(f"Path matching failed for {useCase.useCaseId}: {e}")
return False
def _registerDefaultUseCases(self):
"""Register default use cases."""
# Use Case 1: Section Content Generation
# Returns JSON with "elements" array directly
self.register(LoopingUseCase(
useCaseId="section_content",
jsonTemplate={"elements": []},
detectionKeys=["elements"],
detectionPath="",
initialPromptBuilder=None, # Will use default prompt builder
continuationPromptBuilder=None, # Will use default continuation builder
accumulator=None, # Direct return, no accumulation
merger=None,
continuationContextBuilder=None, # Will use default continuation context
resultBuilder=None, # Return JSON directly
finalResultHandler=_handleSectionContentFinalResult,
jsonNormalizer=_normalizeSectionContentJson,
supportsAccumulation=False,
requiresExtraction=False
))
# Use Case 2: Chapter Structure Generation
# Returns JSON with "documents[0].chapters" structure
self.register(LoopingUseCase(
useCaseId="chapter_structure",
jsonTemplate={"documents": [{"chapters": []}]},
detectionKeys=["chapters"],
detectionPath="documents[0].chapters",
initialPromptBuilder=None,
continuationPromptBuilder=None,
accumulator=None, # Direct return, no accumulation
merger=None,
continuationContextBuilder=None,
resultBuilder=None, # Return JSON directly
finalResultHandler=_handleChapterStructureFinalResult,
jsonNormalizer=_normalizeDefaultJson,
supportsAccumulation=False,
requiresExtraction=False
))
# Use Case 3: Code Structure Generation
self.register(LoopingUseCase(
useCaseId="code_structure",
jsonTemplate={
"metadata": {
"language": "",
"projectType": "single_file|multi_file",
"projectName": ""
},
"files": [
{
"id": "",
"filename": "",
"fileType": "",
"dependencies": [],
"imports": [],
"functions": [],
"classes": []
}
]
},
detectionKeys=["files"],
detectionPath="files",
initialPromptBuilder=None,
continuationPromptBuilder=None,
accumulator=None, # Direct return
merger=None,
continuationContextBuilder=None,
resultBuilder=None,
finalResultHandler=_handleCodeStructureFinalResult,
jsonNormalizer=_normalizeDefaultJson,
supportsAccumulation=False,
requiresExtraction=False
))
# Use Case 5: Code Content Generation (NEW)
self.register(LoopingUseCase(
useCaseId="code_content",
jsonTemplate={"files": [{"content": "", "functions": []}]},
detectionKeys=["content", "functions"],
detectionPath="files[0].content",
initialPromptBuilder=None,
continuationPromptBuilder=None,
accumulator=None, # Will use default accumulator
merger=None, # Will use default merger
continuationContextBuilder=None,
resultBuilder=None, # Will use default result builder
finalResultHandler=_handleCodeContentFinalResult,
jsonNormalizer=_normalizeDefaultJson,
supportsAccumulation=True,
requiresExtraction=False
))
logger.info(f"Registered {len(self.useCases)} default looping use cases")