From fa57d3683b17e30a9fc649899344cb0f0aecc188 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Tue, 30 Dec 2025 02:06:51 +0100 Subject: [PATCH] parallel processing for rendering --- .../serviceAi/PARALLEL_PROCESSING_CONCEPT.md | 376 ++++ .../services/serviceAi/REFACTORING_PLAN.md | 126 -- modules/services/serviceAi/mainServiceAi.py | 1 + .../services/serviceAi/subAiCallLooping.py | 34 +- .../services/serviceAi/subStructureFilling.py | 1926 ++++++++++------- .../serviceAi/subStructureGeneration.py | 31 +- 6 files changed, 1542 insertions(+), 952 deletions(-) create mode 100644 modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md delete mode 100644 modules/services/serviceAi/REFACTORING_PLAN.md diff --git a/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md b/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md new file mode 100644 index 00000000..d8b55298 --- /dev/null +++ b/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md @@ -0,0 +1,376 @@ +# Parallel Processing Refactoring Concept + +## Current State (Sequential) + +### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`) +- **Current**: Processes chapters sequentially, one after another +- **Flow**: + 1. Iterate through documents + 2. For each document, iterate through chapters + 3. For each chapter, generate sections structure using AI + 4. Update progress after each chapter + +### Section Content Generation (`_fillChapterSections`) +- **Current**: Processes chapters sequentially, sections within each chapter sequentially +- **Flow**: + 1. Iterate through documents + 2. For each document, iterate through chapters + 3. For each chapter, iterate through sections + 4. For each section, generate content using AI + 5. Update progress after each section + +## Desired State (Parallel) + +### Chapter Sections Structure Generation +- **Target**: Process all chapters in parallel +- **Requirements**: + - Maintain chapter order in final result + - Each chapter can be processed independently + - Progress updates should reflect parallel processing + - Errors in one chapter should not stop others + +### Section Content Generation +- **Target**: Process sections within each chapter in parallel +- **Requirements**: + - Maintain section order within each chapter + - Sections within a chapter can be processed independently + - Chapters still processed sequentially (to maintain order) + - Progress updates should reflect parallel processing + - Errors in one section should not stop others + +## Implementation Strategy + +### Phase 1: Chapter Sections Structure Generation Parallelization + +#### Step 1.1: Extract Single Chapter Processing +- **Create**: `_generateSingleChapterSectionsStructure()` method +- **Purpose**: Process one chapter independently +- **Parameters**: + - `chapter`: Chapter dict + - `chapterIndex`: Index for ordering + - `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata + - `generationHint`: Generation instructions + - `contentPartIds`, `contentPartInstructions`: Content part info + - `contentParts`: Full content parts list + - `userPrompt`: User's original prompt + - `language`: Language for generation + - `parentOperationId`: For progress logging +- **Returns**: None (modifies chapter dict in place) +- **Error Handling**: Logs errors, raises exception to be caught by caller + +#### Step 1.2: Refactor Main Method +- **Modify**: `_generateChapterSectionsStructure()` +- **Changes**: + 1. Collect all chapters with their indices + 2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure` + 3. Use `asyncio.gather()` to execute all tasks in parallel + 4. Process results in order (using `zip` with original order) + 5. Handle errors per chapter (don't fail entire operation) + 6. Update progress after each chapter completes + +#### Step 1.3: Progress Reporting +- **Maintain**: Overall progress tracking +- **Update**: Progress after each chapter completes (not sequentially) +- **Format**: "Chapter X/Y completed" or "Chapter X/Y error" + +### Phase 2: Section Content Generation Parallelization + +#### Step 2.1: Extract Single Section Processing +- **Create**: `_processSingleSection()` method +- **Purpose**: Process one section independently +- **Parameters**: + - `section`: Section dict + - `sectionIndex`: Index for ordering + - `totalSections`: Total sections in chapter + - `chapterIndex`: Chapter index + - `totalChapters`: Total chapters + - `chapterId`: Chapter ID + - `chapterOperationId`: Chapter progress operation ID + - `fillOperationId`: Overall fill operation ID + - `contentParts`: Full content parts list + - `userPrompt`: User's original prompt + - `all_sections_list`: All sections for context + - `language`: Language for generation + - `calculateOverallProgress`: Function to calculate overall progress +- **Returns**: `List[Dict[str, Any]]` (elements for the section) +- **Error Handling**: Returns error element instead of raising + +#### Step 2.2: Extract Section Processing Logic +- **Create**: Helper methods for different processing paths: + - `_processSectionAggregation()`: Handle aggregation path (multiple parts) + - `_processSectionGeneration()`: Handle generation without parts (only generationHint) + - `_processSectionParts()`: Handle individual part processing +- **Purpose**: Keep logic organized and reusable + +#### Step 2.3: Refactor Main Method +- **Modify**: `_fillChapterSections()` +- **Changes**: + 1. Keep sequential chapter processing (maintains order) + 2. For each chapter, collect all sections with indices + 3. Create async tasks for each section using `_processSingleSection` + 4. Use `asyncio.gather()` to execute all section tasks in parallel + 5. Process results in order (using `zip` with original order) + 6. Assign elements to sections in correct order + 7. Update progress after each section completes + 8. Handle errors per section (don't fail entire chapter) + +#### Step 2.4: Progress Reporting +- **Maintain**: Hierarchical progress tracking +- **Update**: + - Section progress: After each section completes + - Chapter progress: After all sections in chapter complete + - Overall progress: After each section/chapter completes +- **Format**: "Chapter X/Y, Section A/B completed" + +## Key Considerations + +### Order Preservation +- **Chapters**: Must maintain document order → process chapters sequentially +- **Sections**: Must maintain chapter order → process sections sequentially within chapter +- **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order + +### Error Handling +- **Chapters**: Error in one chapter should not stop others +- **Sections**: Error in one section should not stop others +- **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)` + +### Progress Reporting +- **Challenge**: Progress updates happen out of order +- **Solution**: Update progress when each task completes, not sequentially +- **Format**: Show completed count, not sequential position + +### Shared State +- **Chapters**: Modify chapter dicts in place (safe, each chapter is independent) +- **Sections**: Return elements, assign to sections in order (safe, each section is independent) +- **Content Parts**: Read-only, passed to all tasks (safe) + +### Dependencies +- **Chapters**: No dependencies between chapters +- **Sections**: No dependencies between sections (each is self-contained) +- **Solution**: All tasks can run truly in parallel + +## Implementation Steps + +### Step 1: Clean Current Code +1. Ensure current sequential implementation is correct +2. Fix any existing bugs +3. Verify all tests pass + +### Step 2: Implement Chapter Parallelization +1. Create `_generateSingleChapterSectionsStructure()` method +2. Extract chapter processing logic +3. Refactor `_generateChapterSectionsStructure()` to use parallel processing +4. Test with single chapter +5. Test with multiple chapters +6. Verify order preservation +7. Verify error handling + +### Step 3: Implement Section Parallelization +1. Create `_processSingleSection()` method +2. Extract section processing logic into helper methods +3. Refactor `_fillChapterSections()` to use parallel processing for sections +4. Test with single section +5. Test with multiple sections +6. Test with multiple chapters +7. Verify order preservation +8. Verify error handling + +### Step 4: Testing & Validation +1. Test with various document structures +2. Test error scenarios +3. Verify progress reporting accuracy +4. Performance testing (compare sequential vs parallel) +5. Verify final output order matches input order + +## Code Structure + +### New Methods to Create + +```python +async def _generateSingleChapterSectionsStructure( + self, + chapter: Dict[str, Any], + chapterIndex: int, + chapterId: str, + chapterLevel: int, + chapterTitle: str, + generationHint: str, + contentPartIds: List[str], + contentPartInstructions: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + language: str, + parentOperationId: str +) -> None: + """Generate sections structure for a single chapter (used for parallel processing).""" + # Extract logic from current sequential loop + # Modify chapter dict in place + # Handle errors internally, raise if critical + +async def _processSingleSection( + self, + section: Dict[str, Any], + sectionIndex: int, + totalSections: int, + chapterIndex: int, + totalChapters: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentParts: List[ContentPart], + userPrompt: str, + all_sections_list: List[Dict[str, Any]], + language: str, + calculateOverallProgress: Callable +) -> List[Dict[str, Any]]: + """Process a single section and return its elements.""" + # Extract logic from current sequential loop + # Return elements list + # Return error element on failure (don't raise) + +async def _processSectionAggregation( + self, + section: Dict[str, Any], + sectionId: str, + sectionTitle: str, + sectionIndex: int, + totalSections: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentPartIds: List[str], + contentFormats: Dict[str, str], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + all_sections_list: List[Dict[str, Any]], + language: str +) -> List[Dict[str, Any]]: + """Process section with aggregation (multiple parts together).""" + # Extract aggregation logic + # Return elements list + +async def _processSectionGeneration( + self, + section: Dict[str, Any], + sectionId: str, + sectionTitle: str, + sectionIndex: int, + totalSections: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentType: str, + userPrompt: str, + generationHint: str, + all_sections_list: List[Dict[str, Any]], + language: str +) -> List[Dict[str, Any]]: + """Process section generation without content parts (only generationHint).""" + # Extract generation logic + # Return elements list + +async def _processSectionParts( + self, + section: Dict[str, Any], + sectionId: str, + sectionTitle: str, + sectionIndex: int, + totalSections: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentPartIds: List[str], + contentFormats: Dict[str, str], + contentParts: List[ContentPart], + contentType: str, + useAiCall: bool, + generationHint: str, + userPrompt: str, + all_sections_list: List[Dict[str, Any]], + language: str +) -> List[Dict[str, Any]]: + """Process individual parts in a section.""" + # Extract individual part processing logic + # Return elements list +``` + +### Modified Methods + +```python +async def _generateChapterSectionsStructure( + self, + chapterStructure: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + parentOperationId: str +) -> Dict[str, Any]: + """Generate sections structure for all chapters in parallel.""" + # Collect chapters with indices + # Create tasks + # Execute in parallel + # Process results in order + # Update progress + +async def _fillChapterSections( + self, + chapterStructure: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + fillOperationId: str +) -> Dict[str, Any]: + """Fill sections with content, processing sections in parallel within each chapter.""" + # Process chapters sequentially + # For each chapter, process sections in parallel + # Maintain order + # Update progress +``` + +## Testing Strategy + +### Unit Tests +1. Test `_generateSingleChapterSectionsStructure` independently +2. Test `_processSingleSection` independently +3. Test helper methods independently + +### Integration Tests +1. Test parallel chapter processing with multiple chapters +2. Test parallel section processing with multiple sections +3. Test error handling (one chapter/section fails) +4. Test order preservation + +### Performance Tests +1. Measure sequential vs parallel execution time +2. Verify parallel processing is faster +3. Check resource usage (memory, CPU) + +## Risk Mitigation + +### Risks +1. **Order not preserved**: Use `zip` with original order +2. **Race conditions**: No shared mutable state between tasks +3. **Progress reporting incorrect**: Update progress when tasks complete +4. **Errors not handled**: Use `return_exceptions=True` and check results +5. **Performance degradation**: Test and measure, fallback to sequential if needed + +### Safety Measures +1. Keep sequential implementation as fallback (commented out) +2. Add feature flag to enable/disable parallel processing +3. Extensive logging for debugging +4. Gradual rollout (test with small datasets first) + +## Migration Path + +1. **Phase 1**: Implement chapter parallelization, test thoroughly +2. **Phase 2**: Implement section parallelization, test thoroughly +3. **Phase 3**: Enable both in production with monitoring +4. **Phase 4**: Remove sequential fallback code (if stable) + +## Notes + +- All async methods must use `await` correctly +- Progress updates happen asynchronously (may appear out of order in logs) +- Final result order is guaranteed by processing results in order +- Error handling is per-task, not global +- No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts) + diff --git a/modules/services/serviceAi/REFACTORING_PLAN.md b/modules/services/serviceAi/REFACTORING_PLAN.md deleted file mode 100644 index 2ce7a717..00000000 --- a/modules/services/serviceAi/REFACTORING_PLAN.md +++ /dev/null @@ -1,126 +0,0 @@ -# Refactoring Plan für mainServiceAi.py - -## Ziel -Aufteilen des 3000-Zeilen-Moduls in überschaubare Submodule (~300-600 Zeilen pro Modul). - -## Vorgeschlagene Struktur - -### Bereits erstellt: -1. ✅ `subResponseParsing.py` - ResponseParser Klasse -2. ✅ `subDocumentIntents.py` - DocumentIntentAnalyzer Klasse - -### Noch zu erstellen: -3. `subContentExtraction.py` - ContentExtractor Klasse - - `extractAndPrepareContent()` (~490 Zeilen) - - `extractTextFromImage()` (~55 Zeilen) - - `processTextContentWithAi()` (~72 Zeilen) - - `_isBinary()` (~10 Zeilen) - -4. `subStructureGeneration.py` - StructureGenerator Klasse - - `generateStructure()` (~60 Zeilen) - - `_buildStructurePrompt()` (~130 Zeilen) - -5. `subStructureFilling.py` - StructureFiller Klasse - - `fillStructure()` (~290 Zeilen) - - `_buildSectionGenerationPrompt()` (~185 Zeilen) - - `_findContentPartById()` (~5 Zeilen) - - `_needsAggregation()` (~20 Zeilen) - -6. `subAiCallLooping.py` - AiCallLooper Klasse - - `callAiWithLooping()` (~405 Zeilen) - - `_defineKpisFromPrompt()` (~92 Zeilen) - -## Refactoring-Schritte für mainServiceAi.py - -### Schritt 1: Submodule-Initialisierung erweitern - -```python -def _initializeSubmodules(self): - """Initialize all submodules after aiObjects is ready.""" - if self.aiObjects is None: - raise RuntimeError("aiObjects must be initialized before initializing submodules") - - if self.extractionService is None: - logger.info("Initializing ExtractionService...") - self.extractionService = ExtractionService(self.services) - - # Neue Submodule initialisieren - from modules.services.serviceAi.subResponseParsing import ResponseParser - from modules.services.serviceAi.subDocumentIntents import DocumentIntentAnalyzer - from modules.services.serviceAi.subContentExtraction import ContentExtractor - from modules.services.serviceAi.subStructureGeneration import StructureGenerator - from modules.services.serviceAi.subStructureFilling import StructureFiller - - if not hasattr(self, 'responseParser'): - self.responseParser = ResponseParser(self.services) - - if not hasattr(self, 'intentAnalyzer'): - self.intentAnalyzer = DocumentIntentAnalyzer(self.services, self) - - if not hasattr(self, 'contentExtractor'): - self.contentExtractor = ContentExtractor(self.services, self) - - if not hasattr(self, 'structureGenerator'): - self.structureGenerator = StructureGenerator(self.services, self) - - if not hasattr(self, 'structureFiller'): - self.structureFiller = StructureFiller(self.services, self) -``` - -### Schritt 2: Methoden durch Delegation ersetzen - -**Beispiel für Response Parsing:** -```python -# ALT: -def _extractSectionsFromResponse(self, ...): - # 100 Zeilen Code - ... - -# NEU: -def _extractSectionsFromResponse(self, ...): - return self.responseParser.extractSectionsFromResponse(...) -``` - -**Beispiel für Document Intents:** -```python -# ALT: -async def _clarifyDocumentIntents(self, ...): - # 100 Zeilen Code - ... - -# NEU: -async def _clarifyDocumentIntents(self, ...): - return await self.intentAnalyzer.clarifyDocumentIntents(...) -``` - -### Schritt 3: Helper-Methoden beibehalten - -Kleine Helper-Methoden bleiben im Hauptmodul: -- `_buildPromptWithPlaceholders()` -- `_getIntentForDocument()` -- `_shouldSkipContentPart()` -- `_determineDocumentName()` - -### Schritt 4: Public API unverändert lassen - -Die öffentliche API (`callAiPlanning`, `callAiContent`) bleibt unverändert. - -## Erwartete Ergebnis-Größen - -- `mainServiceAi.py`: ~800-1000 Zeilen (von 3016) -- `subResponseParsing.py`: ~200 Zeilen ✅ -- `subDocumentIntents.py`: ~300 Zeilen ✅ -- `subContentExtraction.py`: ~600 Zeilen -- `subStructureGeneration.py`: ~200 Zeilen -- `subStructureFilling.py`: ~400 Zeilen -- `subAiCallLooping.py`: ~500 Zeilen - -**Gesamt: ~3000 Zeilen** (gleich, aber besser organisiert) - -## Vorteile - -1. **Übersichtlichkeit**: Jedes Modul hat eine klare Verantwortlichkeit -2. **Wartbarkeit**: Änderungen sind lokalisiert -3. **Testbarkeit**: Module können einzeln getestet werden -4. **Wiederverwendbarkeit**: Module können in anderen Kontexten verwendet werden - diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 9839093d..65bae155 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -676,6 +676,7 @@ Respond with ONLY a JSON object in this exact format: ) # Schritt 5D: Fülle Struktur + # Language will be extracted from services (user intention analysis) in fillStructure filledStructure = await self._fillStructure( structure, contentParts or [], diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py index 8ebafd23..bb1824c2 100644 --- a/modules/services/serviceAi/subAiCallLooping.py +++ b/modules/services/serviceAi/subAiCallLooping.py @@ -14,7 +14,7 @@ from typing import Dict, Any, List, Optional, Callable from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState from modules.datamodels.datamodelExtraction import ContentPart -from modules.shared.jsonUtils import buildContinuationContext, extractJsonString +from modules.shared.jsonUtils import buildContinuationContext, extractJsonString, tryParseJson from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler logger = logging.getLogger(__name__) @@ -192,6 +192,38 @@ class AiCallLooper: # Store raw response for continuation (even if broken) lastRawResponse = result + # Check if this is section content generation (has "elements" not "sections") + # Section content generation returns JSON with "elements" array, not document structure with "sections" + isSectionContentGeneration = False + parsedJsonForSection = None + extractedJsonForSection = None + try: + extractedJsonForSection = extractJsonString(result) + parsedJson, parseError, _ = tryParseJson(extractedJsonForSection) + if parseError is None and parsedJson: + parsedJsonForSection = parsedJson + # Check if JSON has "elements" (section content) or "sections" (document structure) + if isinstance(parsedJson, dict): + if "elements" in parsedJson: + isSectionContentGeneration = True + elif isinstance(parsedJson, list) and len(parsedJson) > 0: + # Check if it's a list of elements (section content format) + if isinstance(parsedJson[0], dict) and "type" in parsedJson[0]: + isSectionContentGeneration = True + except Exception: + pass + + if isSectionContentGeneration: + # This is section content generation - return the JSON directly + # No need to extract sections, just return the complete JSON string + logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) + # Write final result + final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result) + self.services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result") + return final_json + # Extract sections from response (handles both valid and broken JSON) # Only for document generation (JSON responses) # CRITICAL: Pass allSections and accumulationState to enable string accumulation diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py index 6e3377a1..75642b48 100644 --- a/modules/services/serviceAi/subStructureFilling.py +++ b/modules/services/serviceAi/subStructureFilling.py @@ -11,6 +11,7 @@ Handles filling document structure with content, including: import json import logging import copy +import asyncio from typing import Dict, Any, List, Optional from modules.datamodels.datamodelExtraction import ContentPart @@ -27,12 +28,27 @@ class StructureFiller: self.services = services self.aiService = aiService + def _getUserLanguage(self) -> str: + """Get user language for document generation""" + try: + if self.services: + # Prefer detected language if available (from user intention analysis) + if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage: + return self.services.currentUserLanguage + # Fallback to user's preferred language + elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'): + return self.services.user.language + except Exception: + pass + return 'en' # Default fallback + async def fillStructure( self, structure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, - parentOperationId: str + parentOperationId: str, + language: Optional[str] = None ) -> Dict[str, Any]: """ Phase 5D: Chapter-Content-Generierung (Zwei-Phasen-Ansatz). @@ -45,6 +61,7 @@ class StructureFiller: contentParts: Alle vorbereiteten ContentParts userPrompt: User-Anfrage parentOperationId: Parent Operation-ID für ChatLog-Hierarchie + language: Language identified from user intention analysis (e.g., "de", "en", "fr") Returns: Gefüllte Struktur mit elements in jeder Section (nach Flattening) @@ -64,6 +81,13 @@ class StructureFiller: logger.error(error_msg) raise ValueError(error_msg) + # Get language from services (user intention analysis) or parameter + if language is None: + language = self._getUserLanguage() + logger.debug(f"Using language from services (user intention analysis): {language}") + else: + logger.debug(f"Using provided language parameter: {language}") + # Starte ChatLog mit Parent-Referenz chapterCount = sum(len(doc.get("chapters", [])) for doc in structure.get("documents", [])) self.services.chat.progressLogStart( @@ -79,12 +103,12 @@ class StructureFiller: # Phase 5D.1: Sections-Struktur für jedes Chapter generieren filledStructure = await self._generateChapterSectionsStructure( - filledStructure, contentParts, userPrompt, fillOperationId + filledStructure, contentParts, userPrompt, fillOperationId, language ) # Phase 5D.2: Sections mit ContentParts füllen filledStructure = await self._fillChapterSections( - filledStructure, contentParts, userPrompt, fillOperationId + filledStructure, contentParts, userPrompt, fillOperationId, language ) # Flattening: Chapters zu Sections konvertieren @@ -103,19 +127,133 @@ class StructureFiller: logger.error(f"Error in fillStructure: {str(e)}") raise + async def _generateSingleChapterSectionsStructure( + self, + chapter: Dict[str, Any], + chapterIndex: int, + chapterId: str, + chapterLevel: int, + chapterTitle: str, + generationHint: str, + contentPartIds: List[str], + contentPartInstructions: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + language: str, + parentOperationId: str, + totalChapters: int + ) -> None: + """ + Generate sections structure for a single chapter (used for parallel processing). + Modifies chapter dict in place. + """ + try: + # Update progress for chapter structure generation + progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 + self.services.chat.progressLogUpdate( + parentOperationId, + progress, + f"Generating sections for Chapter {chapterIndex}/{totalChapters}: {chapterTitle}" + ) + + chapterPrompt = self._buildChapterSectionsStructurePrompt( + chapterId=chapterId, + chapterLevel=chapterLevel, + chapterTitle=chapterTitle, + generationHint=generationHint, + contentPartIds=contentPartIds, + contentPartInstructions=contentPartInstructions, + contentParts=contentParts, + userPrompt=userPrompt, + language=language + ) + + # AI-Call für Chapter-Struktur-Generierung + # Note: Debug logging is handled by callAiPlanning + aiResponse = await self.aiService.callAiPlanning( + prompt=chapterPrompt, + debugType=f"chapter_structure_{chapterId}" + ) + + sectionsStructure = json.loads( + self.services.utils.jsonExtractString(aiResponse) + ) + + chapter["sections"] = sectionsStructure.get("sections", []) + + # Setze useAiCall Flag (falls nicht von AI gesetzt) + # WICHTIG: useAiCall kann nur true sein, wenn mindestens ein ContentPart Format "extracted" hat! + # "object" und "reference" Formate werden direkt als Elemente hinzugefügt, benötigen kein AI. + for section in chapter["sections"]: + if "useAiCall" not in section: + contentType = section.get("content_type", "paragraph") + sectionContentPartIds = section.get("contentPartIds", []) + + # Prüfe ob mindestens ein ContentPart Format "extracted" hat + hasExtractedPart = False + for partId in sectionContentPartIds: + part = self._findContentPartById(partId, contentParts) + if part: + contentFormat = part.metadata.get("contentFormat", "unknown") + if contentFormat == "extracted": + hasExtractedPart = True + break + + # useAiCall kann nur true sein, wenn extracted Parts vorhanden sind + useAiCall = False + if hasExtractedPart: + # Prüfe ob Transformation nötig ist + useAiCall = contentType != "paragraph" + + # Prüfe contentPartInstructions für Transformation + if not useAiCall: + for partId in sectionContentPartIds: + instruction = contentPartInstructions.get(partId, {}).get("instruction", "") + if instruction and instruction.lower() not in ["include full text", "include all content", "use full extracted text"]: + useAiCall = True + break + + section["useAiCall"] = useAiCall + logger.debug(f"Section {section.get('id')}: useAiCall={useAiCall} (hasExtractedPart={hasExtractedPart}, contentType={contentType})") + + # Update progress after chapter completion + progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 + self.services.chat.progressLogUpdate( + parentOperationId, + progress, + f"Chapter {chapterIndex}/{totalChapters} completed: {chapterTitle}" + ) + + except Exception as e: + logger.error(f"Error generating sections structure for chapter {chapterId}: {str(e)}") + # Set empty sections on error + chapter["sections"] = [] + # Update progress even on error + progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 + self.services.chat.progressLogUpdate( + parentOperationId, + progress, + f"Chapter {chapterIndex}/{totalChapters} error: {chapterTitle}" + ) + raise + async def _generateChapterSectionsStructure( self, chapterStructure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, - parentOperationId: str + parentOperationId: str, + language: str ) -> Dict[str, Any]: """ - Phase 5D.1: Generiert Sections-Struktur für jedes Chapter (ohne Content). + Phase 5D.1: Generiert Sections-Struktur für jedes Chapter (ohne Content) in parallel. Sections enthalten: content_type, contentPartIds, generationHint, useAiCall """ # Count total chapters for progress tracking totalChapters = sum(len(doc.get("chapters", [])) for doc in chapterStructure.get("documents", [])) + + # Collect all chapters with their indices for parallel processing + chapterTasks = [] chapterIndex = 0 for doc in chapterStructure.get("documents", []): @@ -128,15 +266,10 @@ class StructureFiller: contentPartIds = chapter.get("contentPartIds", []) contentPartInstructions = chapter.get("contentPartInstructions", {}) - # Update progress for chapter structure generation - progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 - self.services.chat.progressLogUpdate( - parentOperationId, - progress, - f"Generating sections for Chapter {chapterIndex}/{totalChapters}: {chapterTitle}" - ) - - chapterPrompt = self._buildChapterSectionsStructurePrompt( + # Create task for parallel processing + task = self._generateSingleChapterSectionsStructure( + chapter=chapter, + chapterIndex=chapterIndex, chapterId=chapterId, chapterLevel=chapterLevel, chapterTitle=chapterTitle, @@ -144,69 +277,926 @@ class StructureFiller: contentPartIds=contentPartIds, contentPartInstructions=contentPartInstructions, contentParts=contentParts, - userPrompt=userPrompt + userPrompt=userPrompt, + language=language, + parentOperationId=parentOperationId, + totalChapters=totalChapters ) - - # AI-Call für Chapter-Struktur-Generierung - # Note: Debug logging is handled by callAiPlanning - aiResponse = await self.aiService.callAiPlanning( - prompt=chapterPrompt, - debugType=f"chapter_structure_{chapterId}" - ) - - sectionsStructure = json.loads( - self.services.utils.jsonExtractString(aiResponse) - ) - - chapter["sections"] = sectionsStructure.get("sections", []) - - # Setze useAiCall Flag (falls nicht von AI gesetzt) - # WICHTIG: useAiCall kann nur true sein, wenn mindestens ein ContentPart Format "extracted" hat! - # "object" und "reference" Formate werden direkt als Elemente hinzugefügt, benötigen kein AI. - for section in chapter["sections"]: - if "useAiCall" not in section: - contentType = section.get("content_type", "paragraph") - contentPartIds = section.get("contentPartIds", []) - - # Prüfe ob mindestens ein ContentPart Format "extracted" hat - hasExtractedPart = False - for partId in contentPartIds: - part = self._findContentPartById(partId, contentParts) - if part: - contentFormat = part.metadata.get("contentFormat", "unknown") - if contentFormat == "extracted": - hasExtractedPart = True - break - - # useAiCall kann nur true sein, wenn extracted Parts vorhanden sind - useAiCall = False - if hasExtractedPart: - # Prüfe ob Transformation nötig ist - useAiCall = contentType != "paragraph" - - # Prüfe contentPartInstructions für Transformation - if not useAiCall: - for partId in contentPartIds: - instruction = contentPartInstructions.get(partId, {}).get("instruction", "") - if instruction and instruction.lower() not in ["include full text", "include all content", "use full extracted text"]: - useAiCall = True - break - - section["useAiCall"] = useAiCall - logger.debug(f"Section {section.get('id')}: useAiCall={useAiCall} (hasExtractedPart={hasExtractedPart}, contentType={contentType})") + chapterTasks.append((chapterIndex, chapter, task)) + + # Execute all chapter tasks in parallel + if chapterTasks: + # Create list of tasks (without indices for gather) + tasks = [task for _, _, task in chapterTasks] + + # Execute in parallel with error handling + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results in order and handle errors + for (originalIndex, originalChapter, _), result in zip(chapterTasks, results): + if isinstance(result, Exception): + logger.error(f"Error processing chapter {originalChapter.get('id')}: {str(result)}") + # Chapter already has empty sections set by _generateSingleChapterSectionsStructure + # Continue processing other chapters return chapterStructure + async def _processAiResponseForSection( + self, + aiResponse: Any, + contentType: str, + operationType: OperationTypeEnum, + sectionId: str, + generationHint: str, + generatedElements: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Helper method to process AI response and extract elements. + Handles both IMAGE_GENERATE and DATA_ANALYSE operation types. + """ + elements = [] + + # Handle IMAGE_GENERATE differently - returns image data directly + if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: + import base64 + base64Data = "" + + # Convert image data to base64 string if needed + if isinstance(aiResponse.content, bytes): + base64Data = base64.b64encode(aiResponse.content).decode('utf-8') + elif isinstance(aiResponse.content, str): + # Check if it's already a JSON structure + try: + jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) + if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": + elements.append(jsonContent) + logger.debug("AI returned proper JSON image structure") + base64Data = None # Signal that image was already processed + elif isinstance(jsonContent, list) and len(jsonContent) > 0: + if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": + elements.extend(jsonContent) + logger.debug("AI returned proper JSON image structure in list") + base64Data = None # Signal that image was already processed + else: + base64Data = "" # Continue with normal processing + else: + base64Data = "" # Continue with normal processing + except (json.JSONDecodeError, ValueError, AttributeError): + base64Data = "" # Will be processed below + + # Process base64 if not already handled above + if base64Data is None: + # Already processed as JSON, skip base64 processing + pass + elif aiResponse.content.startswith("data:image/"): + # Extract base64 from data URI + base64Data = aiResponse.content.split(",", 1)[1] + else: + content_stripped = aiResponse.content.strip() + if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): + base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") + else: + base64Data = aiResponse.content + else: + base64Data = "" + + # Always create proper JSON structure for images (if not already processed) + if base64Data is None: + # Image already processed as JSON, skip + pass + elif base64Data: + elements.append({ + "type": "image", + "content": { + "base64Data": base64Data, + "altText": generationHint or "Generated image", + "caption": "" + } + }) + logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") + else: + logger.warning(f"IMAGE_GENERATE returned empty or invalid content for section {sectionId}") + elements.append({ + "type": "error", + "message": f"Image generation returned empty or invalid content", + "sectionId": sectionId + }) + else: + # For non-image content: Use already parsed elements from _callAiWithLooping + if generatedElements: + elements.extend(generatedElements) + else: + # Fallback: Try to parse JSON response directly + try: + fallbackElements = json.loads( + self.services.utils.jsonExtractString(aiResponse.content) + ) + if isinstance(fallbackElements, list): + elements.extend(fallbackElements) + elif isinstance(fallbackElements, dict) and "elements" in fallbackElements: + elements.extend(fallbackElements["elements"]) + elif isinstance(fallbackElements, dict) and fallbackElements.get("type"): + elements.append(fallbackElements) + except (json.JSONDecodeError, ValueError) as json_error: + logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") + elements.append({ + "type": "error", + "message": f"Failed to parse JSON response: {str(json_error)}", + "sectionId": sectionId + }) + + return elements + + async def _processSingleSection( + self, + section: Dict[str, Any], + sectionIndex: int, + totalSections: int, + chapterIndex: int, + totalChapters: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentParts: List[ContentPart], + userPrompt: str, + all_sections_list: List[Dict[str, Any]], + language: str, + calculateOverallProgress: callable + ) -> List[Dict[str, Any]]: + """ + Process a single section and return its elements. + Used for parallel processing of sections within a chapter. + """ + sectionId = section.get("id") + sectionTitle = section.get("title", sectionId) + contentPartIds = section.get("contentPartIds", []) + contentFormats = section.get("contentFormats", {}) + generationHint = section.get("generationHint") or section.get("generation_hint") + contentType = section.get("content_type", "paragraph") + useAiCall = section.get("useAiCall", False) + + # Update overall progress at start of section + overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex, totalSections) + self.services.chat.progressLogUpdate( + fillOperationId, + overallProgress, + f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections}: {sectionTitle}" + ) + + # WICHTIG: Wenn keine ContentParts vorhanden sind UND kein generationHint, kann kein AI-Call gemacht werden + if len(contentPartIds) == 0 and not generationHint: + useAiCall = False + logger.debug(f"Section {sectionId}: No content parts and no generation hint, setting useAiCall=False") + elif len(contentPartIds) == 0 and generationHint and not useAiCall: + useAiCall = True + logger.info(f"Section {sectionId}: Overriding useAiCall=True (has generationHint but no content parts)") + + elements = [] + + # Prüfe ob Aggregation nötig ist + needsAggregation = self._needsAggregation( + contentType=contentType, + contentPartCount=len(contentPartIds) + ) + + logger.info(f"Processing section {sectionId}: contentType={contentType}, contentPartCount={len(contentPartIds)}, useAiCall={useAiCall}, needsAggregation={needsAggregation}, hasGenerationHint={bool(generationHint)}") + + try: + if needsAggregation and useAiCall: + # Aggregation: Alle Parts zusammen verarbeiten + sectionParts = [ + self._findContentPartById(pid, contentParts) + for pid in contentPartIds + ] + sectionParts = [p for p in sectionParts if p is not None] + + if sectionParts: + # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt) + extractedParts = [ + p for p in sectionParts + if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted" + ] + nonExtractedParts = [ + p for p in sectionParts + if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted" + ] + + # Verarbeite non-extracted Parts separat (reference, object) + for part in nonExtractedParts: + contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat")) + + if contentFormat == "reference": + elements.append({ + "type": "reference", + "documentReference": part.metadata.get("documentReference"), + "label": part.metadata.get("usageHint", part.label) + }) + elif contentFormat == "object": + if part.typeGroup == "image": + elements.append({ + "type": "image", + "content": { + "base64Data": part.data, + "altText": part.metadata.get("usageHint", part.label), + "caption": part.metadata.get("caption", "") + } + }) + else: + elements.append({ + "type": part.typeGroup, + "content": { + "data": part.data, + "mimeType": part.mimeType, + "label": part.metadata.get("usageHint", part.label) + } + }) + + # Aggregiere extracted Parts mit AI + if extractedParts: + logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI") + isAggregation = True + generationPrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=extractedParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=all_sections_list, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + sectionOperationId = f"{fillOperationId}_section_{sectionId}" + self.services.chat.progressLogStart( + sectionOperationId, + "Section Generation (Aggregation)", + f"Section {sectionIndex + 1}/{totalSections}", + f"{sectionTitle} ({len(extractedParts)} parts)", + parentOperationId=chapterOperationId + ) + + try: + self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") + self.services.utils.writeDebugFile( + generationPrompt, + f"{chapterId}_section_{sectionId}_prompt" + ) + logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") + + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + + if operationType == OperationTypeEnum.IMAGE_GENERATE: + maxPromptLength = 4000 + if len(generationPrompt) > maxPromptLength: + logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") + generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] + + request = AiCallRequest( + prompt=generationPrompt, + contentParts=[], + options=AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + ) + aiResponse = await self.aiService.callAi(request) + generatedElements = [] + else: + async def buildSectionPromptWithContinuation( + section: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + allSections: List[Dict[str, Any]], + sectionIndex: int, + isAggregation: bool, + continuationContext: Dict[str, Any], + services: Any + ) -> str: + basePrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=contentParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=allSections, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + continuationInfo = continuationContext.get("delivered_summary", "") + cutOffElement = continuationContext.get("cut_off_element", "") + + continuationPrompt = f"""{basePrompt} + +--- CONTINUATION REQUEST --- +The previous JSON response was incomplete. Please continue from where it stopped. + +PREVIOUSLY DELIVERED SUMMARY: +{continuationInfo} + +LAST INCOMPLETE ELEMENT: +{cutOffElement} + +TASK: Continue generating the JSON elements array from where it was cut off. +Complete the incomplete element and continue with remaining elements. + +Return ONLY the continuation JSON (starting from the incomplete element). +The JSON should be a fragment that can be merged with the previous response.""" + return continuationPrompt + + options = AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + + aiResponseJson = await self.aiService._callAiWithLooping( + prompt=generationPrompt, + options=options, + debugPrefix=f"{chapterId}_section_{sectionId}", + promptBuilder=buildSectionPromptWithContinuation, + promptArgs={ + "section": section, + "contentParts": extractedParts, + "userPrompt": userPrompt, + "generationHint": generationHint, + "allSections": all_sections_list, + "sectionIndex": sectionIndex, + "isAggregation": isAggregation, + "services": self.services + }, + operationId=sectionOperationId, + userPrompt=userPrompt, + contentParts=extractedParts + ) + + try: + parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) + if isinstance(parsedResponse, list): + generatedElements = parsedResponse + elif isinstance(parsedResponse, dict): + if "elements" in parsedResponse: + generatedElements = parsedResponse["elements"] + elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: + firstSection = parsedResponse["sections"][0] + generatedElements = firstSection.get("elements", []) + elif parsedResponse.get("type"): + generatedElements = [parsedResponse] + else: + generatedElements = [] + else: + generatedElements = [] + + class AiResponse: + def __init__(self, content): + self.content = content + + aiResponse = AiResponse(aiResponseJson) + except Exception as parseError: + logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") + class AiResponse: + def __init__(self, content): + self.content = content + aiResponse = AiResponse(aiResponseJson) + generatedElements = [] + + self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") + self.services.utils.writeDebugFile( + aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse), + f"{chapterId}_section_{sectionId}_response" + ) + logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") + + # Process AI response + responseElements = await self._processAiResponseForSection( + aiResponse=aiResponse, + contentType=contentType, + operationType=operationType, + sectionId=sectionId, + generationHint=generationHint, + generatedElements=generatedElements + ) + elements.extend(responseElements) + + self.services.chat.progressLogFinish(sectionOperationId, True) + + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + self.services.chat.progressLogFinish(sectionOperationId, False) + elements.append({ + "type": "error", + "message": f"Error generating section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + logger.error(f"Error generating section {sectionId}: {str(e)}") + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" + ) + + else: + # Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts + if len(contentPartIds) == 0 and useAiCall and generationHint: + # Generate content from scratch using only generationHint + logger.debug(f"Processing section {sectionId}: No content parts, generating from generationHint only") + generationPrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=[], + userPrompt=userPrompt, + generationHint=generationHint, + allSections=all_sections_list, + sectionIndex=sectionIndex, + isAggregation=False, + language=language + ) + + sectionOperationId = f"{fillOperationId}_section_{sectionId}" + self.services.chat.progressLogStart( + sectionOperationId, + "Section Generation", + f"Section {sectionIndex + 1}/{totalSections}", + f"{sectionTitle} (from generationHint)", + parentOperationId=chapterOperationId + ) + + try: + self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") + self.services.utils.writeDebugFile( + generationPrompt, + f"{chapterId}_section_{sectionId}_prompt" + ) + logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") + + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + + if operationType == OperationTypeEnum.IMAGE_GENERATE: + maxPromptLength = 4000 + if len(generationPrompt) > maxPromptLength: + logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") + generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] + + request = AiCallRequest( + prompt=generationPrompt, + contentParts=[], + options=AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + ) + aiResponse = await self.aiService.callAi(request) + generatedElements = [] + else: + isAggregation = False + + async def buildSectionPromptWithContinuation( + section: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + allSections: List[Dict[str, Any]], + sectionIndex: int, + isAggregation: bool, + continuationContext: Dict[str, Any], + services: Any + ) -> str: + basePrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=contentParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=allSections, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + continuationInfo = continuationContext.get("delivered_summary", "") + cutOffElement = continuationContext.get("cut_off_element", "") + + continuationPrompt = f"""{basePrompt} + +--- CONTINUATION REQUEST --- +The previous JSON response was incomplete. Please continue from where it stopped. + +PREVIOUSLY DELIVERED SUMMARY: +{continuationInfo} + +LAST INCOMPLETE ELEMENT: +{cutOffElement} + +TASK: Continue generating the JSON elements array from where it was cut off. +Complete the incomplete element and continue with remaining elements. + +Return ONLY the continuation JSON (starting from the incomplete element). +The JSON should be a fragment that can be merged with the previous response.""" + return continuationPrompt + + options = AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + + aiResponseJson = await self.aiService._callAiWithLooping( + prompt=generationPrompt, + options=options, + debugPrefix=f"{chapterId}_section_{sectionId}", + promptBuilder=buildSectionPromptWithContinuation, + promptArgs={ + "section": section, + "contentParts": [], + "userPrompt": userPrompt, + "generationHint": generationHint, + "allSections": all_sections_list, + "sectionIndex": sectionIndex, + "isAggregation": isAggregation, + "services": self.services + }, + operationId=sectionOperationId, + userPrompt=userPrompt, + contentParts=[] + ) + + try: + parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) + if isinstance(parsedResponse, list): + generatedElements = parsedResponse + elif isinstance(parsedResponse, dict): + if "elements" in parsedResponse: + generatedElements = parsedResponse["elements"] + elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: + firstSection = parsedResponse["sections"][0] + generatedElements = firstSection.get("elements", []) + elif parsedResponse.get("type"): + generatedElements = [parsedResponse] + else: + generatedElements = [] + else: + generatedElements = [] + + class AiResponse: + def __init__(self, content): + self.content = content + + aiResponse = AiResponse(aiResponseJson) + except Exception as parseError: + logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") + class AiResponse: + def __init__(self, content): + self.content = content + aiResponse = AiResponse(aiResponseJson) + generatedElements = [] + + self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") + self.services.utils.writeDebugFile( + aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse), + f"{chapterId}_section_{sectionId}_response" + ) + logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") + + responseElements = await self._processAiResponseForSection( + aiResponse=aiResponse, + contentType=contentType, + operationType=operationType, + sectionId=sectionId, + generationHint=generationHint, + generatedElements=generatedElements + ) + elements.extend(responseElements) + + self.services.chat.progressLogFinish(sectionOperationId, True) + + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + self.services.chat.progressLogFinish(sectionOperationId, False) + elements.append({ + "type": "error", + "message": f"Error generating section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + logger.error(f"Error generating section {sectionId}: {str(e)}") + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" + ) + + # Einzelverarbeitung: Jeder Part einzeln + for partId in contentPartIds: + part = self._findContentPartById(partId, contentParts) + if not part: + continue + + contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat")) + + if contentFormat == "reference": + elements.append({ + "type": "reference", + "documentReference": part.metadata.get("documentReference"), + "label": part.metadata.get("usageHint", part.label) + }) + + elif contentFormat == "object": + if part.typeGroup == "image": + elements.append({ + "type": "image", + "content": { + "base64Data": part.data, + "altText": part.metadata.get("usageHint", part.label), + "caption": part.metadata.get("caption", "") + } + }) + else: + elements.append({ + "type": part.typeGroup, + "content": { + "data": part.data, + "mimeType": part.mimeType, + "label": part.metadata.get("usageHint", part.label) + } + }) + + elif contentFormat == "extracted": + if useAiCall and generationHint: + # AI-Call mit einzelnen ContentPart + logger.debug(f"Processing section {sectionId}: Single extracted part with AI call") + generationPrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=[part], + userPrompt=userPrompt, + generationHint=generationHint, + allSections=all_sections_list, + sectionIndex=sectionIndex, + isAggregation=False, + language=language + ) + + sectionOperationId = f"{fillOperationId}_section_{sectionId}" + self.services.chat.progressLogStart( + sectionOperationId, + "Section Generation", + f"Section {sectionIndex + 1}/{totalSections}", + f"{sectionTitle} (single part)", + parentOperationId=chapterOperationId + ) + + try: + self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") + self.services.utils.writeDebugFile( + generationPrompt, + f"{chapterId}_section_{sectionId}_prompt" + ) + logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") + + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + + if operationType == OperationTypeEnum.IMAGE_GENERATE: + maxPromptLength = 4000 + if len(generationPrompt) > maxPromptLength: + logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") + generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] + + request = AiCallRequest( + prompt=generationPrompt, + contentParts=[], + options=AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + ) + aiResponse = await self.aiService.callAi(request) + generatedElements = [] + else: + isAggregation = False + + async def buildSectionPromptWithContinuation( + section: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + allSections: List[Dict[str, Any]], + sectionIndex: int, + isAggregation: bool, + continuationContext: Dict[str, Any], + services: Any + ) -> str: + basePrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=contentParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=allSections, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + continuationInfo = continuationContext.get("delivered_summary", "") + cutOffElement = continuationContext.get("cut_off_element", "") + + continuationPrompt = f"""{basePrompt} + +--- CONTINUATION REQUEST --- +The previous JSON response was incomplete. Please continue from where it stopped. + +PREVIOUSLY DELIVERED SUMMARY: +{continuationInfo} + +LAST INCOMPLETE ELEMENT: +{cutOffElement} + +TASK: Continue generating the JSON elements array from where it was cut off. +Complete the incomplete element and continue with remaining elements. + +Return ONLY the continuation JSON (starting from the incomplete element). +The JSON should be a fragment that can be merged with the previous response.""" + return continuationPrompt + + options = AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + + aiResponseJson = await self.aiService._callAiWithLooping( + prompt=generationPrompt, + options=options, + debugPrefix=f"{chapterId}_section_{sectionId}", + promptBuilder=buildSectionPromptWithContinuation, + promptArgs={ + "section": section, + "contentParts": [part], + "userPrompt": userPrompt, + "generationHint": generationHint, + "allSections": all_sections_list, + "sectionIndex": sectionIndex, + "isAggregation": isAggregation, + "services": self.services + }, + operationId=sectionOperationId, + userPrompt=userPrompt, + contentParts=[part] + ) + + try: + parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) + if isinstance(parsedResponse, list): + generatedElements = parsedResponse + elif isinstance(parsedResponse, dict): + if "elements" in parsedResponse: + generatedElements = parsedResponse["elements"] + elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: + firstSection = parsedResponse["sections"][0] + generatedElements = firstSection.get("elements", []) + elif parsedResponse.get("type"): + generatedElements = [parsedResponse] + else: + generatedElements = [] + else: + generatedElements = [] + + class AiResponse: + def __init__(self, content): + self.content = content + + aiResponse = AiResponse(aiResponseJson) + except Exception as parseError: + logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") + class AiResponse: + def __init__(self, content): + self.content = content + aiResponse = AiResponse(aiResponseJson) + generatedElements = [] + + self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") + self.services.utils.writeDebugFile( + aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse), + f"{chapterId}_section_{sectionId}_response" + ) + logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") + + responseElements = await self._processAiResponseForSection( + aiResponse=aiResponse, + contentType=contentType, + operationType=operationType, + sectionId=sectionId, + generationHint=generationHint, + generatedElements=generatedElements + ) + elements.extend(responseElements) + + self.services.chat.progressLogFinish(sectionOperationId, True) + + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + self.services.chat.progressLogFinish(sectionOperationId, False) + elements.append({ + "type": "error", + "message": f"Error generating section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + logger.error(f"Error generating section {sectionId}: {str(e)}") + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" + ) + else: + # Füge extrahierten Content direkt hinzu (kein AI-Call) + if part.typeGroup == "image": + logger.debug(f"Processing section {sectionId}: Single extracted IMAGE part WITHOUT AI call") + elements.append({ + "type": "image", + "content": { + "base64Data": part.data, + "altText": part.metadata.get("usageHint", part.label), + "caption": part.metadata.get("caption", "") + } + }) + else: + logger.debug(f"Processing section {sectionId}: Single extracted TEXT part WITHOUT AI call") + elements.append({ + "type": "extracted_text", + "content": part.data, + "source": part.metadata.get("documentId"), + "extractionPrompt": part.metadata.get("extractionPrompt") + }) + + # Update progress after section completion + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex + 1, totalSections) + self.services.chat.progressLogUpdate( + fillOperationId, + overallProgress, + f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + logger.error(f"Unexpected error processing section {sectionId}: {str(e)}") + elements.append({ + "type": "error", + "message": f"Unexpected error processing section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + + return elements + async def _fillChapterSections( self, chapterStructure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, - parentOperationId: str + parentOperationId: str, + language: str ) -> Dict[str, Any]: """ Phase 5D.2: Füllt Sections mit ContentParts. """ + # Sammle alle Sections für Kontext-Informationen (für alle Sections) all_sections_list = [] for doc in chapterStructure.get("documents", []): @@ -252,768 +1242,48 @@ class StructureFiller: parentOperationId=fillOperationId ) - # Process sections within chapter + # Process sections within chapter in parallel + sectionTasks = [] for sectionIndex, section in enumerate(sections): - sectionId = section.get("id") - sectionTitle = section.get("title", sectionId) - contentPartIds = section.get("contentPartIds", []) - contentFormats = section.get("contentFormats", {}) - # Check both camelCase and snake_case for generationHint - generationHint = section.get("generationHint") or section.get("generation_hint") - contentType = section.get("content_type", "paragraph") - useAiCall = section.get("useAiCall", False) - - # Update overall progress at start of section - overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex, totalSections) - self.services.chat.progressLogUpdate( - fillOperationId, - overallProgress, - f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections}: {sectionTitle}" + # Create task for parallel processing + task = self._processSingleSection( + section=section, + sectionIndex=sectionIndex, + totalSections=totalSections, + chapterIndex=chapterIndex, + totalChapters=totalChapters, + chapterId=chapterId, + chapterOperationId=chapterOperationId, + fillOperationId=fillOperationId, + contentParts=contentParts, + userPrompt=userPrompt, + all_sections_list=all_sections_list, + language=language, + calculateOverallProgress=calculateOverallProgress ) + sectionTasks.append((sectionIndex, section, task)) + + # Execute all section tasks in parallel + if sectionTasks: + # Create list of tasks (without indices for gather) + tasks = [task for _, _, task in sectionTasks] - # WICHTIG: Wenn keine ContentParts vorhanden sind UND kein generationHint, kann kein AI-Call gemacht werden - # Aber: Wenn generationHint vorhanden ist, SOLLTE AI verwendet werden, auch wenn useAiCall=false gesetzt ist - # (z.B. wenn AI die Struktur generiert hat, aber useAiCall falsch gesetzt wurde) - if len(contentPartIds) == 0 and not generationHint: - useAiCall = False - logger.debug(f"Section {sectionId}: No content parts and no generation hint, setting useAiCall=False") - elif len(contentPartIds) == 0 and generationHint and not useAiCall: - # Override: If there's a generationHint but no content parts, we should use AI - # This handles cases where structure generation set useAiCall=false incorrectly - useAiCall = True - logger.info(f"Section {sectionId}: Overriding useAiCall=True (has generationHint but no content parts)") + # Execute in parallel with error handling + results = await asyncio.gather(*tasks, return_exceptions=True) - elements = [] - - # Prüfe ob Aggregation nötig ist - needsAggregation = self._needsAggregation( - contentType=contentType, - contentPartCount=len(contentPartIds) - ) - - logger.info(f"Processing section {sectionId}: contentType={contentType}, contentPartCount={len(contentPartIds)}, useAiCall={useAiCall}, needsAggregation={needsAggregation}, hasGenerationHint={bool(generationHint)}") - - if needsAggregation and useAiCall: - # Aggregation: Alle Parts zusammen verarbeiten - sectionParts = [ - self._findContentPartById(pid, contentParts) - for pid in contentPartIds - ] - sectionParts = [p for p in sectionParts if p is not None] - - if sectionParts: - # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt) - extractedParts = [ - p for p in sectionParts - if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted" - ] - nonExtractedParts = [ - p for p in sectionParts - if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted" - ] - - # Verarbeite non-extracted Parts separat (reference, object) - for part in nonExtractedParts: - contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat")) - - if contentFormat == "reference": - elements.append({ - "type": "reference", - "documentReference": part.metadata.get("documentReference"), - "label": part.metadata.get("usageHint", part.label) - }) - elif contentFormat == "object": - # Nested content structure for objects - if part.typeGroup == "image": - elements.append({ - "type": "image", - "content": { - "base64Data": part.data, - "altText": part.metadata.get("usageHint", part.label), - "caption": part.metadata.get("caption", "") - } - }) - else: - elements.append({ - "type": part.typeGroup, - "content": { - "data": part.data, - "mimeType": part.mimeType, - "label": part.metadata.get("usageHint", part.label) - } - }) - - # Aggregiere extracted Parts mit AI - if extractedParts: - logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI") - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=extractedParts, # ALLE PARTS für Aggregation! - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=True - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId) - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation (Aggregation)", - f"Section {sectionIndex + 1}/{totalSections}", - f"{sectionTitle} ({len(extractedParts)} parts)", - parentOperationId=chapterOperationId - ) - - try: - # Update: Building prompt - self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") - - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"{chapterId}_section_{sectionId}_prompt" - ) - logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)") - - # Update: Calling AI - self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - - # Verwende callAi für ContentParts-Unterstützung (nicht callAiPlanning!) - # Use IMAGE_GENERATE for image content type - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE - - # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit) - if operationType == OperationTypeEnum.IMAGE_GENERATE: - maxPromptLength = 4000 - if len(generationPrompt) > maxPromptLength: - logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") - # Keep the beginning (task, metadata, generation hint) and truncate from end - generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline - - # For IMAGE_GENERATE, don't pass contentParts - image generation uses prompt only, not content chunks - contentPartsForCall = [] if operationType == OperationTypeEnum.IMAGE_GENERATE else extractedParts - request = AiCallRequest( - prompt=generationPrompt, - contentParts=contentPartsForCall, # Empty for IMAGE_GENERATE, all parts for others - options=AiCallOptions( - operationType=operationType, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.aiService.callAi(request) - - # Update: Processing response - self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"{chapterId}_section_{sectionId}_response" - ) - logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)") - - # Update: Validating content - self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") - - # Handle IMAGE_GENERATE differently - returns image data directly - if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: - import base64 - base64Data = "" - - # Convert image data to base64 string if needed - if isinstance(aiResponse.content, bytes): - base64Data = base64.b64encode(aiResponse.content).decode('utf-8') - elif isinstance(aiResponse.content, str): - # Check if it's already a JSON structure - try: - # Try to parse as JSON first - jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) - # If it's already a proper JSON structure with image element, use it - if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": - elements.append(jsonContent) - logger.debug("AI returned proper JSON image structure") - # Skip remaining image processing, but continue with progress updates - base64Data = None # Signal that image was already processed - elif isinstance(jsonContent, list) and len(jsonContent) > 0: - # Check if first element is an image - if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": - elements.extend(jsonContent) - logger.debug("AI returned proper JSON image structure in list") - # Skip remaining image processing, but continue with progress updates - base64Data = None # Signal that image was already processed - else: - base64Data = "" # Continue with normal processing - except (json.JSONDecodeError, ValueError, AttributeError): - # Not JSON, treat as base64 string or data URI - base64Data = "" # Will be processed below - - # Process base64 if not already handled above - if base64Data is None: - # Already processed as JSON, skip base64 processing - pass - elif aiResponse.content.startswith("data:image/"): - # Extract base64 from data URI - base64Data = aiResponse.content.split(",", 1)[1] - else: - # Check if it looks like base64 (alphanumeric + / + =) - content_stripped = aiResponse.content.strip() - if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): - # Looks like base64, use it - base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") - else: - base64Data = aiResponse.content - else: - base64Data = "" - - # Always create proper JSON structure for images (if not already processed) - if base64Data is None: - # Image already processed as JSON, skip - pass - elif base64Data: - elements.append({ - "type": "image", - "content": { - "base64Data": base64Data, - "altText": generationHint or "Generated image", - "caption": "" - } - }) - logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") - else: - logger.warning(f"IMAGE_GENERATE returned empty or invalid content for section {sectionId}") - elements.append({ - "type": "error", - "message": f"Image generation returned empty or invalid content", - "sectionId": sectionId - }) - else: - # Parse JSON response for other content types - try: - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - elif isinstance(generatedElements, dict) and generatedElements.get("type"): - # Single element in dict format - elements.append(generatedElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - # Try to extract any image data that might be in the response - if contentType == "image": - # Check if response content might be base64 image data - content_str = str(aiResponse.content) - if len(content_str) > 100: - elements.append({ - "type": "error", - "message": f"Failed to parse image generation response: {str(json_error)}", - "sectionId": sectionId - }) - else: - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - # Update chapter progress after section completion - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # Still update chapter progress even on error - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" - ) - # NICHT raise - Section wird mit Fehlermeldung gerendert - - else: - # Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts - # Handle case where no content parts but generationHint exists (e.g., Executive Summary) - if len(contentPartIds) == 0 and useAiCall and generationHint: - # Generate content from scratch using only generationHint - logger.debug(f"Processing section {sectionId}: No content parts, generating from generationHint only") - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=[], # NO PARTS - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=False - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId) - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation", - f"Section {sectionIndex + 1}/{totalSections}", - f"{sectionTitle} (from generationHint)", - parentOperationId=chapterOperationId - ) - - try: - # Update: Building prompt - self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") - - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"{chapterId}_section_{sectionId}_prompt" - ) - logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt") - - # Update: Calling AI - self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - - # Verwende callAi ohne ContentParts - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE - - # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit) - if operationType == OperationTypeEnum.IMAGE_GENERATE: - maxPromptLength = 4000 - if len(generationPrompt) > maxPromptLength: - logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") - # Keep the beginning (task, metadata, generation hint) and truncate from end - generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline - - request = AiCallRequest( - prompt=generationPrompt, - contentParts=[], # NO PARTS - options=AiCallOptions( - operationType=operationType, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.aiService.callAi(request) - - # Update: Processing response - self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"{chapterId}_section_{sectionId}_response" - ) - logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response") - - # Update: Validating content - self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") - - # Handle IMAGE_GENERATE differently - returns image data directly - if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: - import base64 - base64Data = "" - - # Convert image data to base64 string if needed - if isinstance(aiResponse.content, bytes): - base64Data = base64.b64encode(aiResponse.content).decode('utf-8') - elif isinstance(aiResponse.content, str): - # Check if it's already a JSON structure - try: - jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) - if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": - elements.append(jsonContent) - logger.debug("AI returned proper JSON image structure") - # Skip remaining image processing, but continue with progress updates - base64Data = None # Signal that image was already processed - elif isinstance(jsonContent, list) and len(jsonContent) > 0: - if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": - elements.extend(jsonContent) - logger.debug("AI returned proper JSON image structure in list") - # Skip remaining image processing, but continue with progress updates - base64Data = None # Signal that image was already processed - else: - base64Data = "" # Continue with normal processing - except (json.JSONDecodeError, ValueError, AttributeError): - base64Data = "" # Will be processed below - - # Process base64 if not already handled above - if base64Data is None: - # Already processed as JSON, skip base64 processing - pass - elif aiResponse.content.startswith("data:image/"): - # Extract base64 from data URI - base64Data = aiResponse.content.split(",", 1)[1] - else: - content_stripped = aiResponse.content.strip() - if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): - base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") - else: - base64Data = aiResponse.content - else: - base64Data = "" - - # Always create proper JSON structure for images (if not already processed) - if base64Data is None: - # Image already processed as JSON, skip - pass - elif base64Data: - elements.append({ - "type": "image", - "content": { - "base64Data": base64Data, - "altText": generationHint or "Generated image", - "caption": "" - } - }) - logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") - else: - logger.warning(f"IMAGE_GENERATE returned empty content for section {sectionId}") - elements.append({ - "type": "error", - "message": f"Image generation returned empty content", - "sectionId": sectionId - }) - else: - # Parse JSON response for other content types - try: - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - elif isinstance(generatedElements, dict) and generatedElements.get("type"): - elements.append(generatedElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - # Update chapter progress after section completion - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # Still update chapter progress even on error - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" - ) - - # Einzelverarbeitung: Jeder Part einzeln - for partId in contentPartIds: - part = self._findContentPartById(partId, contentParts) - if not part: - continue - - contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat")) - - if contentFormat == "reference": - # Füge Dokument-Referenz hinzu - elements.append({ - "type": "reference", - "documentReference": part.metadata.get("documentReference"), - "label": part.metadata.get("usageHint", part.label) - }) - - elif contentFormat == "object": - # Füge base64 Object hinzu (nested in content structure) - if part.typeGroup == "image": - elements.append({ - "type": "image", - "content": { - "base64Data": part.data, - "altText": part.metadata.get("usageHint", part.label), - "caption": part.metadata.get("caption", "") - } - }) - else: - # For other object types, use generic structure - elements.append({ - "type": part.typeGroup, - "content": { - "data": part.data, - "mimeType": part.mimeType, - "label": part.metadata.get("usageHint", part.label) - } - }) - - elif contentFormat == "extracted": - # WICHTIG: Prüfe sowohl useAiCall als auch generationHint - if useAiCall and generationHint: - # AI-Call mit einzelnen ContentPart - logger.debug(f"Processing section {sectionId}: Single extracted part with AI call (useAiCall={useAiCall}, generationHint={bool(generationHint)})") - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=[part], # EIN PART - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=False - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId) - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation", - f"Section {sectionIndex + 1}/{totalSections}", - f"{sectionTitle} (single part)", - parentOperationId=chapterOperationId - ) - - try: - # Update: Building prompt - self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") - - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"{chapterId}_section_{sectionId}_prompt" - ) - logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt") - - # Update: Calling AI - self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - - # Verwende callAi für ContentParts-Unterstützung - # Use IMAGE_GENERATE for image content type - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE - - # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit) - if operationType == OperationTypeEnum.IMAGE_GENERATE: - maxPromptLength = 4000 - if len(generationPrompt) > maxPromptLength: - logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") - # Keep the beginning (task, metadata, generation hint) and truncate from end - generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline - - # For IMAGE_GENERATE, don't pass contentParts - image generation uses prompt only, not content chunks - contentPartsForCall = [] if operationType == OperationTypeEnum.IMAGE_GENERATE else [part] - request = AiCallRequest( - prompt=generationPrompt, - contentParts=contentPartsForCall, - options=AiCallOptions( - operationType=operationType, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.aiService.callAi(request) - - # Update: Processing response - self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"{chapterId}_section_{sectionId}_response" - ) - logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response") - - # Update: Validating content - self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") - - # Handle IMAGE_GENERATE differently - returns image data directly - if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: - import base64 - base64Data = "" - - # Convert image data to base64 string if needed - if isinstance(aiResponse.content, bytes): - base64Data = base64.b64encode(aiResponse.content).decode('utf-8') - elif isinstance(aiResponse.content, str): - # Check if it's already a JSON structure - try: - jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) - if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": - elements.append(jsonContent) - logger.debug("AI returned proper JSON image structure") - # Skip remaining image processing, but continue with progress updates - base64Data = None # Signal that image was already processed - elif isinstance(jsonContent, list) and len(jsonContent) > 0: - if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": - elements.extend(jsonContent) - logger.debug("AI returned proper JSON image structure in list") - # Skip remaining image processing, but continue with progress updates - base64Data = None # Signal that image was already processed - else: - base64Data = "" # Continue with normal processing - except (json.JSONDecodeError, ValueError, AttributeError): - base64Data = "" # Will be processed below - - # Process base64 if not already handled above - if base64Data is None: - # Already processed as JSON, skip base64 processing - pass - elif aiResponse.content.startswith("data:image/"): - # Extract base64 from data URI - base64Data = aiResponse.content.split(",", 1)[1] - else: - content_stripped = aiResponse.content.strip() - if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): - base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") - else: - base64Data = aiResponse.content - else: - base64Data = "" - - # Always create proper JSON structure for images (if not already processed) - if base64Data is None: - # Image already processed as JSON, skip - pass - elif base64Data: - elements.append({ - "type": "image", - "content": { - "base64Data": base64Data, - "altText": generationHint or "Generated image", - "caption": "" - } - }) - logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") - else: - logger.warning(f"IMAGE_GENERATE returned empty content for section {sectionId}") - elements.append({ - "type": "error", - "message": f"Image generation returned empty content", - "sectionId": sectionId - }) - else: - # Parse JSON response for other content types - try: - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - elif isinstance(generatedElements, dict) and generatedElements.get("type"): - elements.append(generatedElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - # Update chapter progress after section completion - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # Still update chapter progress even on error - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" - ) - # NICHT raise - Section wird mit Fehlermeldung gerendert - else: - # Füge extrahierten Content direkt hinzu (kein AI-Call) - # CRITICAL: Check part typeGroup to determine correct element type - if part.typeGroup == "image": - # Image content should be added as image element, not extracted_text - logger.debug(f"Processing section {sectionId}: Single extracted IMAGE part WITHOUT AI call - adding as image element") - elements.append({ - "type": "image", - "content": { - "base64Data": part.data, - "altText": part.metadata.get("usageHint", part.label), - "caption": part.metadata.get("caption", "") - } - }) - else: - # Text content - add as extracted_text element - logger.debug(f"Processing section {sectionId}: Single extracted TEXT part WITHOUT AI call (useAiCall={useAiCall}, generationHint={bool(generationHint)}) - adding extracted text directly") - elements.append({ - "type": "extracted_text", - "content": part.data, - "source": part.metadata.get("documentId"), - "extractionPrompt": part.metadata.get("extractionPrompt") - }) - - # Assign elements to section (for all processing paths) - section["elements"] = elements - - # Update chapter progress after section completion (for all sections, including non-AI) - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - # Update overall progress after section completion - overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex + 1, totalSections) - self.services.chat.progressLogUpdate( - fillOperationId, - overallProgress, - f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections} completed" - ) + # Process results in order and assign elements to sections + for (originalIndex, originalSection, _), result in zip(sectionTasks, results): + if isinstance(result, Exception): + logger.error(f"Error processing section {originalSection.get('id')}: {str(result)}") + # Set error element + originalSection["elements"] = [{ + "type": "error", + "message": f"Error processing section: {str(result)}", + "sectionId": originalSection.get("id") + }] + else: + # Assign elements to section in correct order + originalSection["elements"] = result # Finish chapter operation after all sections processed self.services.chat.progressLogFinish(chapterOperationId, True) @@ -1153,7 +1423,8 @@ class StructureFiller: contentPartIds: List[str], contentPartInstructions: Dict[str, Any], contentParts: List[ContentPart], - userPrompt: str + userPrompt: str, + language: str = "en" ) -> str: """Baue Prompt für Chapter-Sections-Struktur-Generierung.""" # Baue ContentParts-Index (nur IDs, keine Previews!) @@ -1176,6 +1447,8 @@ class StructureFiller: prompt = f"""TASK: Generate Chapter Sections Structure +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + CHAPTER: {chapterTitle} (Level {chapterLevel}, ID: {chapterId}) GENERATION HINT: {generationHint} @@ -1245,7 +1518,8 @@ CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside th generationHint: str, allSections: Optional[List[Dict[str, Any]]] = None, sectionIndex: Optional[int] = None, - isAggregation: bool = False + isAggregation: bool = False, + language: str = "en" ) -> str: """Baue Prompt für Section-Generierung mit vollständigem Kontext.""" # Filtere None-Werte @@ -1344,6 +1618,8 @@ CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside th if isAggregation: prompt = f"""# TASK: Generate Section Content (Aggregation) +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + ## SECTION METADATA - Section ID: {sectionId} - Content Type: {contentType} @@ -1392,6 +1668,8 @@ CRITICAL: else: prompt = f"""# TASK: Generate Section Content +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + ## SECTION METADATA - Section ID: {sectionId} - Content Type: {contentType} diff --git a/modules/services/serviceAi/subStructureGeneration.py b/modules/services/serviceAi/subStructureGeneration.py index d3b46e0e..f4a26b5b 100644 --- a/modules/services/serviceAi/subStructureGeneration.py +++ b/modules/services/serviceAi/subStructureGeneration.py @@ -76,7 +76,30 @@ class StructureGenerator: ) # Parse Struktur - structure = json.loads(self.services.utils.jsonExtractString(aiResponse)) + # Use tryParseJson which handles malformed JSON and unterminated strings + extractedJson = self.services.utils.jsonExtractString(aiResponse) + parsedJson, parseError, cleanedJson = self.services.utils.jsonTryParse(extractedJson) + + if parseError is not None: + # Try to repair broken JSON (handles unterminated strings, incomplete structures, etc.) + logger.warning(f"Initial JSON parsing failed: {str(parseError)}. Attempting repair...") + from modules.shared import jsonUtils + repairedJson = jsonUtils.repairBrokenJson(extractedJson) + if repairedJson: + # Try parsing repaired JSON + parsedJson, parseError, _ = self.services.utils.jsonTryParse(json.dumps(repairedJson)) + if parseError is None: + logger.info("Successfully repaired and parsed JSON structure") + structure = parsedJson + else: + logger.error(f"Failed to parse repaired JSON: {str(parseError)}") + raise ValueError(f"Failed to parse JSON structure after repair: {str(parseError)}") + else: + logger.error(f"Failed to repair JSON. Parse error: {str(parseError)}") + logger.error(f"Cleaned JSON preview (first 500 chars): {cleanedJson[:500]}") + raise ValueError(f"Failed to parse JSON structure: {str(parseError)}") + else: + structure = parsedJson # ChatLog abschließen self.services.chat.progressLogFinish(structureOperationId, True) @@ -145,11 +168,17 @@ class StructureGenerator: if not contentPartsIndex: contentPartsIndex = "\n(No content parts available)" + # Extract language from user prompt or default to "de" (can be detected from userPrompt) + # For now, default to "de" - can be enhanced with language detection later + language = "en" # Default language + prompt = f"""USER REQUEST (for context): ``` {userPrompt} ``` +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + AVAILABLE CONTENT PARTS: {contentPartsIndex}