diff --git a/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md b/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md new file mode 100644 index 00000000..d8b55298 --- /dev/null +++ b/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md @@ -0,0 +1,376 @@ +# Parallel Processing Refactoring Concept + +## Current State (Sequential) + +### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`) +- **Current**: Processes chapters sequentially, one after another +- **Flow**: + 1. Iterate through documents + 2. For each document, iterate through chapters + 3. For each chapter, generate sections structure using AI + 4. Update progress after each chapter + +### Section Content Generation (`_fillChapterSections`) +- **Current**: Processes chapters sequentially, sections within each chapter sequentially +- **Flow**: + 1. Iterate through documents + 2. For each document, iterate through chapters + 3. For each chapter, iterate through sections + 4. For each section, generate content using AI + 5. Update progress after each section + +## Desired State (Parallel) + +### Chapter Sections Structure Generation +- **Target**: Process all chapters in parallel +- **Requirements**: + - Maintain chapter order in final result + - Each chapter can be processed independently + - Progress updates should reflect parallel processing + - Errors in one chapter should not stop others + +### Section Content Generation +- **Target**: Process sections within each chapter in parallel +- **Requirements**: + - Maintain section order within each chapter + - Sections within a chapter can be processed independently + - Chapters still processed sequentially (to maintain order) + - Progress updates should reflect parallel processing + - Errors in one section should not stop others + +## Implementation Strategy + +### Phase 1: Chapter Sections Structure Generation Parallelization + +#### Step 1.1: Extract Single Chapter Processing +- **Create**: `_generateSingleChapterSectionsStructure()` method +- **Purpose**: Process one chapter independently +- **Parameters**: + - `chapter`: Chapter dict + - `chapterIndex`: Index for ordering + - `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata + - `generationHint`: Generation instructions + - `contentPartIds`, `contentPartInstructions`: Content part info + - `contentParts`: Full content parts list + - `userPrompt`: User's original prompt + - `language`: Language for generation + - `parentOperationId`: For progress logging +- **Returns**: None (modifies chapter dict in place) +- **Error Handling**: Logs errors, raises exception to be caught by caller + +#### Step 1.2: Refactor Main Method +- **Modify**: `_generateChapterSectionsStructure()` +- **Changes**: + 1. Collect all chapters with their indices + 2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure` + 3. Use `asyncio.gather()` to execute all tasks in parallel + 4. Process results in order (using `zip` with original order) + 5. Handle errors per chapter (don't fail entire operation) + 6. Update progress after each chapter completes + +#### Step 1.3: Progress Reporting +- **Maintain**: Overall progress tracking +- **Update**: Progress after each chapter completes (not sequentially) +- **Format**: "Chapter X/Y completed" or "Chapter X/Y error" + +### Phase 2: Section Content Generation Parallelization + +#### Step 2.1: Extract Single Section Processing +- **Create**: `_processSingleSection()` method +- **Purpose**: Process one section independently +- **Parameters**: + - `section`: Section dict + - `sectionIndex`: Index for ordering + - `totalSections`: Total sections in chapter + - `chapterIndex`: Chapter index + - `totalChapters`: Total chapters + - `chapterId`: Chapter ID + - `chapterOperationId`: Chapter progress operation ID + - `fillOperationId`: Overall fill operation ID + - `contentParts`: Full content parts list + - `userPrompt`: User's original prompt + - `all_sections_list`: All sections for context + - `language`: Language for generation + - `calculateOverallProgress`: Function to calculate overall progress +- **Returns**: `List[Dict[str, Any]]` (elements for the section) +- **Error Handling**: Returns error element instead of raising + +#### Step 2.2: Extract Section Processing Logic +- **Create**: Helper methods for different processing paths: + - `_processSectionAggregation()`: Handle aggregation path (multiple parts) + - `_processSectionGeneration()`: Handle generation without parts (only generationHint) + - `_processSectionParts()`: Handle individual part processing +- **Purpose**: Keep logic organized and reusable + +#### Step 2.3: Refactor Main Method +- **Modify**: `_fillChapterSections()` +- **Changes**: + 1. Keep sequential chapter processing (maintains order) + 2. For each chapter, collect all sections with indices + 3. Create async tasks for each section using `_processSingleSection` + 4. Use `asyncio.gather()` to execute all section tasks in parallel + 5. Process results in order (using `zip` with original order) + 6. Assign elements to sections in correct order + 7. Update progress after each section completes + 8. Handle errors per section (don't fail entire chapter) + +#### Step 2.4: Progress Reporting +- **Maintain**: Hierarchical progress tracking +- **Update**: + - Section progress: After each section completes + - Chapter progress: After all sections in chapter complete + - Overall progress: After each section/chapter completes +- **Format**: "Chapter X/Y, Section A/B completed" + +## Key Considerations + +### Order Preservation +- **Chapters**: Must maintain document order → process chapters sequentially +- **Sections**: Must maintain chapter order → process sections sequentially within chapter +- **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order + +### Error Handling +- **Chapters**: Error in one chapter should not stop others +- **Sections**: Error in one section should not stop others +- **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)` + +### Progress Reporting +- **Challenge**: Progress updates happen out of order +- **Solution**: Update progress when each task completes, not sequentially +- **Format**: Show completed count, not sequential position + +### Shared State +- **Chapters**: Modify chapter dicts in place (safe, each chapter is independent) +- **Sections**: Return elements, assign to sections in order (safe, each section is independent) +- **Content Parts**: Read-only, passed to all tasks (safe) + +### Dependencies +- **Chapters**: No dependencies between chapters +- **Sections**: No dependencies between sections (each is self-contained) +- **Solution**: All tasks can run truly in parallel + +## Implementation Steps + +### Step 1: Clean Current Code +1. Ensure current sequential implementation is correct +2. Fix any existing bugs +3. Verify all tests pass + +### Step 2: Implement Chapter Parallelization +1. Create `_generateSingleChapterSectionsStructure()` method +2. Extract chapter processing logic +3. Refactor `_generateChapterSectionsStructure()` to use parallel processing +4. Test with single chapter +5. Test with multiple chapters +6. Verify order preservation +7. Verify error handling + +### Step 3: Implement Section Parallelization +1. Create `_processSingleSection()` method +2. Extract section processing logic into helper methods +3. Refactor `_fillChapterSections()` to use parallel processing for sections +4. Test with single section +5. Test with multiple sections +6. Test with multiple chapters +7. Verify order preservation +8. Verify error handling + +### Step 4: Testing & Validation +1. Test with various document structures +2. Test error scenarios +3. Verify progress reporting accuracy +4. Performance testing (compare sequential vs parallel) +5. Verify final output order matches input order + +## Code Structure + +### New Methods to Create + +```python +async def _generateSingleChapterSectionsStructure( + self, + chapter: Dict[str, Any], + chapterIndex: int, + chapterId: str, + chapterLevel: int, + chapterTitle: str, + generationHint: str, + contentPartIds: List[str], + contentPartInstructions: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + language: str, + parentOperationId: str +) -> None: + """Generate sections structure for a single chapter (used for parallel processing).""" + # Extract logic from current sequential loop + # Modify chapter dict in place + # Handle errors internally, raise if critical + +async def _processSingleSection( + self, + section: Dict[str, Any], + sectionIndex: int, + totalSections: int, + chapterIndex: int, + totalChapters: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentParts: List[ContentPart], + userPrompt: str, + all_sections_list: List[Dict[str, Any]], + language: str, + calculateOverallProgress: Callable +) -> List[Dict[str, Any]]: + """Process a single section and return its elements.""" + # Extract logic from current sequential loop + # Return elements list + # Return error element on failure (don't raise) + +async def _processSectionAggregation( + self, + section: Dict[str, Any], + sectionId: str, + sectionTitle: str, + sectionIndex: int, + totalSections: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentPartIds: List[str], + contentFormats: Dict[str, str], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + all_sections_list: List[Dict[str, Any]], + language: str +) -> List[Dict[str, Any]]: + """Process section with aggregation (multiple parts together).""" + # Extract aggregation logic + # Return elements list + +async def _processSectionGeneration( + self, + section: Dict[str, Any], + sectionId: str, + sectionTitle: str, + sectionIndex: int, + totalSections: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentType: str, + userPrompt: str, + generationHint: str, + all_sections_list: List[Dict[str, Any]], + language: str +) -> List[Dict[str, Any]]: + """Process section generation without content parts (only generationHint).""" + # Extract generation logic + # Return elements list + +async def _processSectionParts( + self, + section: Dict[str, Any], + sectionId: str, + sectionTitle: str, + sectionIndex: int, + totalSections: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentPartIds: List[str], + contentFormats: Dict[str, str], + contentParts: List[ContentPart], + contentType: str, + useAiCall: bool, + generationHint: str, + userPrompt: str, + all_sections_list: List[Dict[str, Any]], + language: str +) -> List[Dict[str, Any]]: + """Process individual parts in a section.""" + # Extract individual part processing logic + # Return elements list +``` + +### Modified Methods + +```python +async def _generateChapterSectionsStructure( + self, + chapterStructure: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + parentOperationId: str +) -> Dict[str, Any]: + """Generate sections structure for all chapters in parallel.""" + # Collect chapters with indices + # Create tasks + # Execute in parallel + # Process results in order + # Update progress + +async def _fillChapterSections( + self, + chapterStructure: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + fillOperationId: str +) -> Dict[str, Any]: + """Fill sections with content, processing sections in parallel within each chapter.""" + # Process chapters sequentially + # For each chapter, process sections in parallel + # Maintain order + # Update progress +``` + +## Testing Strategy + +### Unit Tests +1. Test `_generateSingleChapterSectionsStructure` independently +2. Test `_processSingleSection` independently +3. Test helper methods independently + +### Integration Tests +1. Test parallel chapter processing with multiple chapters +2. Test parallel section processing with multiple sections +3. Test error handling (one chapter/section fails) +4. Test order preservation + +### Performance Tests +1. Measure sequential vs parallel execution time +2. Verify parallel processing is faster +3. Check resource usage (memory, CPU) + +## Risk Mitigation + +### Risks +1. **Order not preserved**: Use `zip` with original order +2. **Race conditions**: No shared mutable state between tasks +3. **Progress reporting incorrect**: Update progress when tasks complete +4. **Errors not handled**: Use `return_exceptions=True` and check results +5. **Performance degradation**: Test and measure, fallback to sequential if needed + +### Safety Measures +1. Keep sequential implementation as fallback (commented out) +2. Add feature flag to enable/disable parallel processing +3. Extensive logging for debugging +4. Gradual rollout (test with small datasets first) + +## Migration Path + +1. **Phase 1**: Implement chapter parallelization, test thoroughly +2. **Phase 2**: Implement section parallelization, test thoroughly +3. **Phase 3**: Enable both in production with monitoring +4. **Phase 4**: Remove sequential fallback code (if stable) + +## Notes + +- All async methods must use `await` correctly +- Progress updates happen asynchronously (may appear out of order in logs) +- Final result order is guaranteed by processing results in order +- Error handling is per-task, not global +- No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts) + diff --git a/modules/services/serviceAi/REFACTORING_PLAN.md b/modules/services/serviceAi/REFACTORING_PLAN.md deleted file mode 100644 index 2ce7a717..00000000 --- a/modules/services/serviceAi/REFACTORING_PLAN.md +++ /dev/null @@ -1,126 +0,0 @@ -# Refactoring Plan für mainServiceAi.py - -## Ziel -Aufteilen des 3000-Zeilen-Moduls in überschaubare Submodule (~300-600 Zeilen pro Modul). - -## Vorgeschlagene Struktur - -### Bereits erstellt: -1. ✅ `subResponseParsing.py` - ResponseParser Klasse -2. ✅ `subDocumentIntents.py` - DocumentIntentAnalyzer Klasse - -### Noch zu erstellen: -3. `subContentExtraction.py` - ContentExtractor Klasse - - `extractAndPrepareContent()` (~490 Zeilen) - - `extractTextFromImage()` (~55 Zeilen) - - `processTextContentWithAi()` (~72 Zeilen) - - `_isBinary()` (~10 Zeilen) - -4. `subStructureGeneration.py` - StructureGenerator Klasse - - `generateStructure()` (~60 Zeilen) - - `_buildStructurePrompt()` (~130 Zeilen) - -5. `subStructureFilling.py` - StructureFiller Klasse - - `fillStructure()` (~290 Zeilen) - - `_buildSectionGenerationPrompt()` (~185 Zeilen) - - `_findContentPartById()` (~5 Zeilen) - - `_needsAggregation()` (~20 Zeilen) - -6. `subAiCallLooping.py` - AiCallLooper Klasse - - `callAiWithLooping()` (~405 Zeilen) - - `_defineKpisFromPrompt()` (~92 Zeilen) - -## Refactoring-Schritte für mainServiceAi.py - -### Schritt 1: Submodule-Initialisierung erweitern - -```python -def _initializeSubmodules(self): - """Initialize all submodules after aiObjects is ready.""" - if self.aiObjects is None: - raise RuntimeError("aiObjects must be initialized before initializing submodules") - - if self.extractionService is None: - logger.info("Initializing ExtractionService...") - self.extractionService = ExtractionService(self.services) - - # Neue Submodule initialisieren - from modules.services.serviceAi.subResponseParsing import ResponseParser - from modules.services.serviceAi.subDocumentIntents import DocumentIntentAnalyzer - from modules.services.serviceAi.subContentExtraction import ContentExtractor - from modules.services.serviceAi.subStructureGeneration import StructureGenerator - from modules.services.serviceAi.subStructureFilling import StructureFiller - - if not hasattr(self, 'responseParser'): - self.responseParser = ResponseParser(self.services) - - if not hasattr(self, 'intentAnalyzer'): - self.intentAnalyzer = DocumentIntentAnalyzer(self.services, self) - - if not hasattr(self, 'contentExtractor'): - self.contentExtractor = ContentExtractor(self.services, self) - - if not hasattr(self, 'structureGenerator'): - self.structureGenerator = StructureGenerator(self.services, self) - - if not hasattr(self, 'structureFiller'): - self.structureFiller = StructureFiller(self.services, self) -``` - -### Schritt 2: Methoden durch Delegation ersetzen - -**Beispiel für Response Parsing:** -```python -# ALT: -def _extractSectionsFromResponse(self, ...): - # 100 Zeilen Code - ... - -# NEU: -def _extractSectionsFromResponse(self, ...): - return self.responseParser.extractSectionsFromResponse(...) -``` - -**Beispiel für Document Intents:** -```python -# ALT: -async def _clarifyDocumentIntents(self, ...): - # 100 Zeilen Code - ... - -# NEU: -async def _clarifyDocumentIntents(self, ...): - return await self.intentAnalyzer.clarifyDocumentIntents(...) -``` - -### Schritt 3: Helper-Methoden beibehalten - -Kleine Helper-Methoden bleiben im Hauptmodul: -- `_buildPromptWithPlaceholders()` -- `_getIntentForDocument()` -- `_shouldSkipContentPart()` -- `_determineDocumentName()` - -### Schritt 4: Public API unverändert lassen - -Die öffentliche API (`callAiPlanning`, `callAiContent`) bleibt unverändert. - -## Erwartete Ergebnis-Größen - -- `mainServiceAi.py`: ~800-1000 Zeilen (von 3016) -- `subResponseParsing.py`: ~200 Zeilen ✅ -- `subDocumentIntents.py`: ~300 Zeilen ✅ -- `subContentExtraction.py`: ~600 Zeilen -- `subStructureGeneration.py`: ~200 Zeilen -- `subStructureFilling.py`: ~400 Zeilen -- `subAiCallLooping.py`: ~500 Zeilen - -**Gesamt: ~3000 Zeilen** (gleich, aber besser organisiert) - -## Vorteile - -1. **Übersichtlichkeit**: Jedes Modul hat eine klare Verantwortlichkeit -2. **Wartbarkeit**: Änderungen sind lokalisiert -3. **Testbarkeit**: Module können einzeln getestet werden -4. **Wiederverwendbarkeit**: Module können in anderen Kontexten verwendet werden - diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 9839093d..65bae155 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -676,6 +676,7 @@ Respond with ONLY a JSON object in this exact format: ) # Schritt 5D: Fülle Struktur + # Language will be extracted from services (user intention analysis) in fillStructure filledStructure = await self._fillStructure( structure, contentParts or [], diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py index 8ebafd23..6e2c90b5 100644 --- a/modules/services/serviceAi/subAiCallLooping.py +++ b/modules/services/serviceAi/subAiCallLooping.py @@ -14,7 +14,7 @@ from typing import Dict, Any, List, Optional, Callable from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState from modules.datamodels.datamodelExtraction import ContentPart -from modules.shared.jsonUtils import buildContinuationContext, extractJsonString +from modules.shared.jsonUtils import buildContinuationContext, extractJsonString, tryParseJson from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler logger = logging.getLogger(__name__) @@ -122,10 +122,14 @@ class AiCallLooper: ) # Write the ACTUAL prompt sent to AI - if iteration == 1: - self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt") - else: - self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") + # For section content generation: only write one prompt file (first iteration) + # For document generation: write prompt for each iteration + isSectionContent = "_section_" in debugPrefix + if iteration == 1 or not isSectionContent: + if iteration == 1: + self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt") + elif not isSectionContent: + self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}") response = await self.aiService.callAi(request) result = response.content @@ -146,10 +150,13 @@ class AiCallLooper: self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})") # Write raw AI response to debug file - if iteration == 1: - self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") - else: - self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") + # For section content generation: only write one response file (first iteration) + # For document generation: write response for each iteration + if iteration == 1 or not isSectionContent: + if iteration == 1: + self.services.utils.writeDebugFile(result, f"{debugPrefix}_response") + elif not isSectionContent: + self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}") # Emit stats for this iteration (only if workflow exists and has id) if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id: @@ -192,6 +199,38 @@ class AiCallLooper: # Store raw response for continuation (even if broken) lastRawResponse = result + # Check if this is section content generation (has "elements" not "sections") + # Section content generation returns JSON with "elements" array, not document structure with "sections" + isSectionContentGeneration = False + parsedJsonForSection = None + extractedJsonForSection = None + try: + extractedJsonForSection = extractJsonString(result) + parsedJson, parseError, _ = tryParseJson(extractedJsonForSection) + if parseError is None and parsedJson: + parsedJsonForSection = parsedJson + # Check if JSON has "elements" (section content) or "sections" (document structure) + if isinstance(parsedJson, dict): + if "elements" in parsedJson: + isSectionContentGeneration = True + elif isinstance(parsedJson, list) and len(parsedJson) > 0: + # Check if it's a list of elements (section content format) + if isinstance(parsedJson[0], dict) and "type" in parsedJson[0]: + isSectionContentGeneration = True + except Exception: + pass + + if isSectionContentGeneration: + # This is section content generation - return the JSON directly + # No need to extract sections, just return the complete JSON string + logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly") + if iterationOperationId: + self.services.chat.progressLogFinish(iterationOperationId, True) + # Note: Debug files (_prompt and _response) are already written above for iteration 1 + # No need to write _final_result as it's redundant with _response + final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result) + return final_json + # Extract sections from response (handles both valid and broken JSON) # Only for document generation (JSON responses) # CRITICAL: Pass allSections and accumulationState to enable string accumulation @@ -365,7 +404,10 @@ class AiCallLooper: self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})") # Log merged sections for debugging - self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") + # For section content generation: skip merged sections debug files (only one prompt/response needed) + isSectionContent = "_section_" in debugPrefix + if not isSectionContent: + self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}") # Check if we should continue (completion detection) # Simple logic: JSON completeness determines continuation @@ -433,7 +475,10 @@ class AiCallLooper: final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata) # Write final result to debug file - self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") + # For section content generation: skip final_result debug file (response already written) + isSectionContent = "_section_" in debugPrefix + if not isSectionContent: + self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result") return final_result diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py index 7089103c..138f6572 100644 --- a/modules/services/serviceAi/subStructureFilling.py +++ b/modules/services/serviceAi/subStructureFilling.py @@ -11,6 +11,7 @@ Handles filling document structure with content, including: import json import logging import copy +import asyncio from typing import Dict, Any, List, Optional from modules.datamodels.datamodelExtraction import ContentPart @@ -27,12 +28,27 @@ class StructureFiller: self.services = services self.aiService = aiService + def _getUserLanguage(self) -> str: + """Get user language for document generation""" + try: + if self.services: + # Prefer detected language if available (from user intention analysis) + if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage: + return self.services.currentUserLanguage + # Fallback to user's preferred language + elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'): + return self.services.user.language + except Exception: + pass + return 'en' # Default fallback + async def fillStructure( self, structure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, - parentOperationId: str + parentOperationId: str, + language: Optional[str] = None ) -> Dict[str, Any]: """ Phase 5D: Chapter-Content-Generierung (Zwei-Phasen-Ansatz). @@ -45,6 +61,7 @@ class StructureFiller: contentParts: Alle vorbereiteten ContentParts userPrompt: User-Anfrage parentOperationId: Parent Operation-ID für ChatLog-Hierarchie + language: Language identified from user intention analysis (e.g., "de", "en", "fr") Returns: Gefüllte Struktur mit elements in jeder Section (nach Flattening) @@ -64,6 +81,13 @@ class StructureFiller: logger.error(error_msg) raise ValueError(error_msg) + # Get language from services (user intention analysis) or parameter + if language is None: + language = self._getUserLanguage() + logger.debug(f"Using language from services (user intention analysis): {language}") + else: + logger.debug(f"Using provided language parameter: {language}") + # Starte ChatLog mit Parent-Referenz chapterCount = sum(len(doc.get("chapters", [])) for doc in structure.get("documents", [])) self.services.chat.progressLogStart( @@ -79,12 +103,12 @@ class StructureFiller: # Phase 5D.1: Sections-Struktur für jedes Chapter generieren filledStructure = await self._generateChapterSectionsStructure( - filledStructure, contentParts, userPrompt, fillOperationId + filledStructure, contentParts, userPrompt, fillOperationId, language ) # Phase 5D.2: Sections mit ContentParts füllen filledStructure = await self._fillChapterSections( - filledStructure, contentParts, userPrompt, fillOperationId + filledStructure, contentParts, userPrompt, fillOperationId, language ) # Flattening: Chapters zu Sections konvertieren @@ -103,19 +127,133 @@ class StructureFiller: logger.error(f"Error in fillStructure: {str(e)}") raise + async def _generateSingleChapterSectionsStructure( + self, + chapter: Dict[str, Any], + chapterIndex: int, + chapterId: str, + chapterLevel: int, + chapterTitle: str, + generationHint: str, + contentPartIds: List[str], + contentPartInstructions: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + language: str, + parentOperationId: str, + totalChapters: int + ) -> None: + """ + Generate sections structure for a single chapter (used for parallel processing). + Modifies chapter dict in place. + """ + try: + # Update progress for chapter structure generation + progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 + self.services.chat.progressLogUpdate( + parentOperationId, + progress, + f"Generating sections for Chapter {chapterIndex}/{totalChapters}: {chapterTitle}" + ) + + chapterPrompt = self._buildChapterSectionsStructurePrompt( + chapterId=chapterId, + chapterLevel=chapterLevel, + chapterTitle=chapterTitle, + generationHint=generationHint, + contentPartIds=contentPartIds, + contentPartInstructions=contentPartInstructions, + contentParts=contentParts, + userPrompt=userPrompt, + language=language + ) + + # AI-Call für Chapter-Struktur-Generierung + # Note: Debug logging is handled by callAiPlanning + aiResponse = await self.aiService.callAiPlanning( + prompt=chapterPrompt, + debugType=f"chapter_structure_{chapterId}" + ) + + sectionsStructure = json.loads( + self.services.utils.jsonExtractString(aiResponse) + ) + + chapter["sections"] = sectionsStructure.get("sections", []) + + # Setze useAiCall Flag (falls nicht von AI gesetzt) + # WICHTIG: useAiCall kann nur true sein, wenn mindestens ein ContentPart Format "extracted" hat! + # "object" und "reference" Formate werden direkt als Elemente hinzugefügt, benötigen kein AI. + for section in chapter["sections"]: + if "useAiCall" not in section: + contentType = section.get("content_type", "paragraph") + sectionContentPartIds = section.get("contentPartIds", []) + + # Prüfe ob mindestens ein ContentPart Format "extracted" hat + hasExtractedPart = False + for partId in sectionContentPartIds: + part = self._findContentPartById(partId, contentParts) + if part: + contentFormat = part.metadata.get("contentFormat", "unknown") + if contentFormat == "extracted": + hasExtractedPart = True + break + + # useAiCall kann nur true sein, wenn extracted Parts vorhanden sind + useAiCall = False + if hasExtractedPart: + # Prüfe ob Transformation nötig ist + useAiCall = contentType != "paragraph" + + # Prüfe contentPartInstructions für Transformation + if not useAiCall: + for partId in sectionContentPartIds: + instruction = contentPartInstructions.get(partId, {}).get("instruction", "") + if instruction and instruction.lower() not in ["include full text", "include all content", "use full extracted text"]: + useAiCall = True + break + + section["useAiCall"] = useAiCall + logger.debug(f"Section {section.get('id')}: useAiCall={useAiCall} (hasExtractedPart={hasExtractedPart}, contentType={contentType})") + + # Update progress after chapter completion + progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 + self.services.chat.progressLogUpdate( + parentOperationId, + progress, + f"Chapter {chapterIndex}/{totalChapters} completed: {chapterTitle}" + ) + + except Exception as e: + logger.error(f"Error generating sections structure for chapter {chapterId}: {str(e)}") + # Set empty sections on error + chapter["sections"] = [] + # Update progress even on error + progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 + self.services.chat.progressLogUpdate( + parentOperationId, + progress, + f"Chapter {chapterIndex}/{totalChapters} error: {chapterTitle}" + ) + raise + async def _generateChapterSectionsStructure( self, chapterStructure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, - parentOperationId: str + parentOperationId: str, + language: str ) -> Dict[str, Any]: """ - Phase 5D.1: Generiert Sections-Struktur für jedes Chapter (ohne Content). + Phase 5D.1: Generiert Sections-Struktur für jedes Chapter (ohne Content) in parallel. Sections enthalten: content_type, contentPartIds, generationHint, useAiCall """ # Count total chapters for progress tracking totalChapters = sum(len(doc.get("chapters", [])) for doc in chapterStructure.get("documents", [])) + + # Collect all chapters with their indices for parallel processing + chapterTasks = [] chapterIndex = 0 for doc in chapterStructure.get("documents", []): @@ -128,15 +266,10 @@ class StructureFiller: contentPartIds = chapter.get("contentPartIds", []) contentPartInstructions = chapter.get("contentPartInstructions", {}) - # Update progress for chapter structure generation - progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0 - self.services.chat.progressLogUpdate( - parentOperationId, - progress, - f"Generating sections for Chapter {chapterIndex}/{totalChapters}: {chapterTitle}" - ) - - chapterPrompt = self._buildChapterSectionsStructurePrompt( + # Create task for parallel processing + task = self._generateSingleChapterSectionsStructure( + chapter=chapter, + chapterIndex=chapterIndex, chapterId=chapterId, chapterLevel=chapterLevel, chapterTitle=chapterTitle, @@ -144,69 +277,935 @@ class StructureFiller: contentPartIds=contentPartIds, contentPartInstructions=contentPartInstructions, contentParts=contentParts, - userPrompt=userPrompt + userPrompt=userPrompt, + language=language, + parentOperationId=parentOperationId, + totalChapters=totalChapters ) - - # AI-Call für Chapter-Struktur-Generierung - # Note: Debug logging is handled by callAiPlanning - aiResponse = await self.aiService.callAiPlanning( - prompt=chapterPrompt, - debugType=f"chapter_structure_{chapterId}" - ) - - sectionsStructure = json.loads( - self.services.utils.jsonExtractString(aiResponse) - ) - - chapter["sections"] = sectionsStructure.get("sections", []) - - # Setze useAiCall Flag (falls nicht von AI gesetzt) - # WICHTIG: useAiCall kann nur true sein, wenn mindestens ein ContentPart Format "extracted" hat! - # "object" und "reference" Formate werden direkt als Elemente hinzugefügt, benötigen kein AI. - for section in chapter["sections"]: - if "useAiCall" not in section: - contentType = section.get("content_type", "paragraph") - contentPartIds = section.get("contentPartIds", []) - - # Prüfe ob mindestens ein ContentPart Format "extracted" hat - hasExtractedPart = False - for partId in contentPartIds: - part = self._findContentPartById(partId, contentParts) - if part: - contentFormat = part.metadata.get("contentFormat", "unknown") - if contentFormat == "extracted": - hasExtractedPart = True - break - - # useAiCall kann nur true sein, wenn extracted Parts vorhanden sind - useAiCall = False - if hasExtractedPart: - # Prüfe ob Transformation nötig ist - useAiCall = contentType != "paragraph" - - # Prüfe contentPartInstructions für Transformation - if not useAiCall: - for partId in contentPartIds: - instruction = contentPartInstructions.get(partId, {}).get("instruction", "") - if instruction and instruction.lower() not in ["include full text", "include all content", "use full extracted text"]: - useAiCall = True - break - - section["useAiCall"] = useAiCall - logger.debug(f"Section {section.get('id')}: useAiCall={useAiCall} (hasExtractedPart={hasExtractedPart}, contentType={contentType})") + chapterTasks.append((chapterIndex, chapter, task)) + + # Execute all chapter tasks in parallel + if chapterTasks: + # Create list of tasks (without indices for gather) + tasks = [task for _, _, task in chapterTasks] + + # Execute in parallel with error handling + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Process results in order and handle errors + for (originalIndex, originalChapter, _), result in zip(chapterTasks, results): + if isinstance(result, Exception): + logger.error(f"Error processing chapter {originalChapter.get('id')}: {str(result)}") + # Chapter already has empty sections set by _generateSingleChapterSectionsStructure + # Continue processing other chapters return chapterStructure + async def _processAiResponseForSection( + self, + aiResponse: Any, + contentType: str, + operationType: OperationTypeEnum, + sectionId: str, + generationHint: str, + generatedElements: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """ + Helper method to process AI response and extract elements. + Handles both IMAGE_GENERATE and DATA_ANALYSE operation types. + """ + elements = [] + + # Handle IMAGE_GENERATE differently - returns image data directly + if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: + import base64 + base64Data = "" + + # Convert image data to base64 string if needed + if isinstance(aiResponse.content, bytes): + base64Data = base64.b64encode(aiResponse.content).decode('utf-8') + elif isinstance(aiResponse.content, str): + # Check if it's already a JSON structure + try: + jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) + if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": + elements.append(jsonContent) + logger.debug("AI returned proper JSON image structure") + base64Data = None # Signal that image was already processed + elif isinstance(jsonContent, list) and len(jsonContent) > 0: + if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": + elements.extend(jsonContent) + logger.debug("AI returned proper JSON image structure in list") + base64Data = None # Signal that image was already processed + else: + base64Data = "" # Continue with normal processing + else: + base64Data = "" # Continue with normal processing + except (json.JSONDecodeError, ValueError, AttributeError): + base64Data = "" # Will be processed below + + # Process base64 if not already handled above + if base64Data is None: + # Already processed as JSON, skip base64 processing + pass + elif aiResponse.content.startswith("data:image/"): + # Extract base64 from data URI + base64Data = aiResponse.content.split(",", 1)[1] + else: + content_stripped = aiResponse.content.strip() + if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): + base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") + else: + base64Data = aiResponse.content + else: + base64Data = "" + + # Always create proper JSON structure for images (if not already processed) + if base64Data is None: + # Image already processed as JSON, skip + pass + elif base64Data: + elements.append({ + "type": "image", + "content": { + "base64Data": base64Data, + "altText": generationHint or "Generated image", + "caption": "" + } + }) + logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") + else: + logger.warning(f"IMAGE_GENERATE returned empty or invalid content for section {sectionId}") + elements.append({ + "type": "error", + "message": f"Image generation returned empty or invalid content", + "sectionId": sectionId + }) + else: + # For non-image content: Use already parsed elements from _callAiWithLooping + if generatedElements: + elements.extend(generatedElements) + else: + # Fallback: Try to parse JSON response directly + try: + fallbackElements = json.loads( + self.services.utils.jsonExtractString(aiResponse.content) + ) + if isinstance(fallbackElements, list): + elements.extend(fallbackElements) + elif isinstance(fallbackElements, dict) and "elements" in fallbackElements: + elements.extend(fallbackElements["elements"]) + elif isinstance(fallbackElements, dict) and fallbackElements.get("type"): + elements.append(fallbackElements) + except (json.JSONDecodeError, ValueError) as json_error: + logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") + elements.append({ + "type": "error", + "message": f"Failed to parse JSON response: {str(json_error)}", + "sectionId": sectionId + }) + + return elements + + async def _processSingleSection( + self, + section: Dict[str, Any], + sectionIndex: int, + totalSections: int, + chapterIndex: int, + totalChapters: int, + chapterId: str, + chapterOperationId: str, + fillOperationId: str, + contentParts: List[ContentPart], + userPrompt: str, + all_sections_list: List[Dict[str, Any]], + language: str, + calculateOverallProgress: callable + ) -> List[Dict[str, Any]]: + """ + Process a single section and return its elements. + Used for parallel processing of sections within a chapter. + """ + sectionId = section.get("id") + sectionTitle = section.get("title", sectionId) + contentPartIds = section.get("contentPartIds", []) + contentFormats = section.get("contentFormats", {}) + generationHint = section.get("generationHint") or section.get("generation_hint") + contentType = section.get("content_type", "paragraph") + useAiCall = section.get("useAiCall", False) + + # Update overall progress at start of section + overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex, totalSections) + self.services.chat.progressLogUpdate( + fillOperationId, + overallProgress, + f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections}: {sectionTitle}" + ) + + # WICHTIG: Wenn keine ContentParts vorhanden sind UND kein generationHint, kann kein AI-Call gemacht werden + if len(contentPartIds) == 0 and not generationHint: + useAiCall = False + logger.debug(f"Section {sectionId}: No content parts and no generation hint, setting useAiCall=False") + elif len(contentPartIds) == 0 and generationHint and not useAiCall: + useAiCall = True + logger.info(f"Section {sectionId}: Overriding useAiCall=True (has generationHint but no content parts)") + + elements = [] + + # Prüfe ob Aggregation nötig ist + needsAggregation = self._needsAggregation( + contentType=contentType, + contentPartCount=len(contentPartIds) + ) + + logger.info(f"Processing section {sectionId}: contentType={contentType}, contentPartCount={len(contentPartIds)}, useAiCall={useAiCall}, needsAggregation={needsAggregation}, hasGenerationHint={bool(generationHint)}") + + try: + if needsAggregation and useAiCall: + # Aggregation: Alle Parts zusammen verarbeiten + sectionParts = [ + self._findContentPartById(pid, contentParts) + for pid in contentPartIds + ] + sectionParts = [p for p in sectionParts if p is not None] + + if sectionParts: + # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt) + extractedParts = [ + p for p in sectionParts + if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted" + ] + nonExtractedParts = [ + p for p in sectionParts + if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted" + ] + + # Verarbeite non-extracted Parts separat (reference, object) + for part in nonExtractedParts: + contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat")) + + if contentFormat == "reference": + elements.append({ + "type": "reference", + "documentReference": part.metadata.get("documentReference"), + "label": part.metadata.get("usageHint", part.label) + }) + elif contentFormat == "object": + if part.typeGroup == "image": + elements.append({ + "type": "image", + "content": { + "base64Data": part.data, + "altText": part.metadata.get("usageHint", part.label), + "caption": part.metadata.get("caption", "") + } + }) + else: + elements.append({ + "type": part.typeGroup, + "content": { + "data": part.data, + "mimeType": part.mimeType, + "label": part.metadata.get("usageHint", part.label) + } + }) + + # Aggregiere extracted Parts mit AI + if extractedParts: + logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI") + isAggregation = True + generationPrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=extractedParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=all_sections_list, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + sectionOperationId = f"{fillOperationId}_section_{sectionId}" + self.services.chat.progressLogStart( + sectionOperationId, + "Section Generation (Aggregation)", + f"Section {sectionIndex + 1}/{totalSections}", + f"{sectionTitle} ({len(extractedParts)} parts)", + parentOperationId=chapterOperationId + ) + + try: + self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") + + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + + if operationType == OperationTypeEnum.IMAGE_GENERATE: + maxPromptLength = 4000 + if len(generationPrompt) > maxPromptLength: + logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") + generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] + + # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping) + self.services.utils.writeDebugFile( + generationPrompt, + f"{chapterId}_section_{sectionId}_prompt" + ) + + request = AiCallRequest( + prompt=generationPrompt, + contentParts=[], + options=AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + ) + aiResponse = await self.aiService.callAi(request) + generatedElements = [] + + # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping) + self.services.utils.writeDebugFile( + aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse), + f"{chapterId}_section_{sectionId}_response" + ) + else: + async def buildSectionPromptWithContinuation( + section: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + allSections: List[Dict[str, Any]], + sectionIndex: int, + isAggregation: bool, + continuationContext: Dict[str, Any], + services: Any + ) -> str: + basePrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=contentParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=allSections, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + continuationInfo = continuationContext.get("delivered_summary", "") + cutOffElement = continuationContext.get("cut_off_element", "") + + continuationPrompt = f"""{basePrompt} + +--- CONTINUATION REQUEST --- +The previous JSON response was incomplete. Please continue from where it stopped. + +PREVIOUSLY DELIVERED SUMMARY: +{continuationInfo} + +LAST INCOMPLETE ELEMENT: +{cutOffElement} + +TASK: Continue generating the JSON elements array from where it was cut off. +Complete the incomplete element and continue with remaining elements. + +Return ONLY the continuation JSON (starting from the incomplete element). +The JSON should be a fragment that can be merged with the previous response.""" + return continuationPrompt + + options = AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + + aiResponseJson = await self.aiService._callAiWithLooping( + prompt=generationPrompt, + options=options, + debugPrefix=f"{chapterId}_section_{sectionId}", + promptBuilder=buildSectionPromptWithContinuation, + promptArgs={ + "section": section, + "contentParts": extractedParts, + "userPrompt": userPrompt, + "generationHint": generationHint, + "allSections": all_sections_list, + "sectionIndex": sectionIndex, + "isAggregation": isAggregation, + "services": self.services + }, + operationId=sectionOperationId, + userPrompt=userPrompt, + contentParts=extractedParts + ) + + try: + parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) + if isinstance(parsedResponse, list): + generatedElements = parsedResponse + elif isinstance(parsedResponse, dict): + if "elements" in parsedResponse: + generatedElements = parsedResponse["elements"] + elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: + firstSection = parsedResponse["sections"][0] + generatedElements = firstSection.get("elements", []) + elif parsedResponse.get("type"): + generatedElements = [parsedResponse] + else: + generatedElements = [] + else: + generatedElements = [] + + class AiResponse: + def __init__(self, content): + self.content = content + + aiResponse = AiResponse(aiResponseJson) + except Exception as parseError: + logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") + class AiResponse: + def __init__(self, content): + self.content = content + aiResponse = AiResponse(aiResponseJson) + generatedElements = [] + + self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") + # Note: Debug files are written by _callAiWithLooping using debugPrefix + + self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") + + # Process AI response + responseElements = await self._processAiResponseForSection( + aiResponse=aiResponse, + contentType=contentType, + operationType=operationType, + sectionId=sectionId, + generationHint=generationHint, + generatedElements=generatedElements + ) + elements.extend(responseElements) + + self.services.chat.progressLogFinish(sectionOperationId, True) + + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + self.services.chat.progressLogFinish(sectionOperationId, False) + elements.append({ + "type": "error", + "message": f"Error generating section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + logger.error(f"Error generating section {sectionId}: {str(e)}") + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" + ) + + else: + # Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts + if len(contentPartIds) == 0 and useAiCall and generationHint: + # Generate content from scratch using only generationHint + logger.debug(f"Processing section {sectionId}: No content parts, generating from generationHint only") + generationPrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=[], + userPrompt=userPrompt, + generationHint=generationHint, + allSections=all_sections_list, + sectionIndex=sectionIndex, + isAggregation=False, + language=language + ) + + sectionOperationId = f"{fillOperationId}_section_{sectionId}" + self.services.chat.progressLogStart( + sectionOperationId, + "Section Generation", + f"Section {sectionIndex + 1}/{totalSections}", + f"{sectionTitle} (from generationHint)", + parentOperationId=chapterOperationId + ) + + try: + self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") + + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + + if operationType == OperationTypeEnum.IMAGE_GENERATE: + maxPromptLength = 4000 + if len(generationPrompt) > maxPromptLength: + logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") + generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] + + # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping) + self.services.utils.writeDebugFile( + generationPrompt, + f"{chapterId}_section_{sectionId}_prompt" + ) + + request = AiCallRequest( + prompt=generationPrompt, + contentParts=[], + options=AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + ) + aiResponse = await self.aiService.callAi(request) + generatedElements = [] + + # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping) + self.services.utils.writeDebugFile( + aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse), + f"{chapterId}_section_{sectionId}_response" + ) + else: + isAggregation = False + + async def buildSectionPromptWithContinuation( + section: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + allSections: List[Dict[str, Any]], + sectionIndex: int, + isAggregation: bool, + continuationContext: Dict[str, Any], + services: Any + ) -> str: + basePrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=contentParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=allSections, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + continuationInfo = continuationContext.get("delivered_summary", "") + cutOffElement = continuationContext.get("cut_off_element", "") + + continuationPrompt = f"""{basePrompt} + +--- CONTINUATION REQUEST --- +The previous JSON response was incomplete. Please continue from where it stopped. + +PREVIOUSLY DELIVERED SUMMARY: +{continuationInfo} + +LAST INCOMPLETE ELEMENT: +{cutOffElement} + +TASK: Continue generating the JSON elements array from where it was cut off. +Complete the incomplete element and continue with remaining elements. + +Return ONLY the continuation JSON (starting from the incomplete element). +The JSON should be a fragment that can be merged with the previous response.""" + return continuationPrompt + + options = AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + + aiResponseJson = await self.aiService._callAiWithLooping( + prompt=generationPrompt, + options=options, + debugPrefix=f"{chapterId}_section_{sectionId}", + promptBuilder=buildSectionPromptWithContinuation, + promptArgs={ + "section": section, + "contentParts": [], + "userPrompt": userPrompt, + "generationHint": generationHint, + "allSections": all_sections_list, + "sectionIndex": sectionIndex, + "isAggregation": isAggregation, + "services": self.services + }, + operationId=sectionOperationId, + userPrompt=userPrompt, + contentParts=[] + ) + + try: + parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) + if isinstance(parsedResponse, list): + generatedElements = parsedResponse + elif isinstance(parsedResponse, dict): + if "elements" in parsedResponse: + generatedElements = parsedResponse["elements"] + elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: + firstSection = parsedResponse["sections"][0] + generatedElements = firstSection.get("elements", []) + elif parsedResponse.get("type"): + generatedElements = [parsedResponse] + else: + generatedElements = [] + else: + generatedElements = [] + + class AiResponse: + def __init__(self, content): + self.content = content + + aiResponse = AiResponse(aiResponseJson) + except Exception as parseError: + logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") + class AiResponse: + def __init__(self, content): + self.content = content + aiResponse = AiResponse(aiResponseJson) + generatedElements = [] + + self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") + # Note: Debug files are written by _callAiWithLooping using debugPrefix + + self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") + + responseElements = await self._processAiResponseForSection( + aiResponse=aiResponse, + contentType=contentType, + operationType=operationType, + sectionId=sectionId, + generationHint=generationHint, + generatedElements=generatedElements + ) + elements.extend(responseElements) + + self.services.chat.progressLogFinish(sectionOperationId, True) + + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + self.services.chat.progressLogFinish(sectionOperationId, False) + elements.append({ + "type": "error", + "message": f"Error generating section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + logger.error(f"Error generating section {sectionId}: {str(e)}") + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" + ) + + # Einzelverarbeitung: Jeder Part einzeln + for partId in contentPartIds: + part = self._findContentPartById(partId, contentParts) + if not part: + continue + + contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat")) + + if contentFormat == "reference": + elements.append({ + "type": "reference", + "documentReference": part.metadata.get("documentReference"), + "label": part.metadata.get("usageHint", part.label) + }) + + elif contentFormat == "object": + if part.typeGroup == "image": + elements.append({ + "type": "image", + "content": { + "base64Data": part.data, + "altText": part.metadata.get("usageHint", part.label), + "caption": part.metadata.get("caption", "") + } + }) + else: + elements.append({ + "type": part.typeGroup, + "content": { + "data": part.data, + "mimeType": part.mimeType, + "label": part.metadata.get("usageHint", part.label) + } + }) + + elif contentFormat == "extracted": + if useAiCall and generationHint: + # AI-Call mit einzelnen ContentPart + logger.debug(f"Processing section {sectionId}: Single extracted part with AI call") + generationPrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=[part], + userPrompt=userPrompt, + generationHint=generationHint, + allSections=all_sections_list, + sectionIndex=sectionIndex, + isAggregation=False, + language=language + ) + + sectionOperationId = f"{fillOperationId}_section_{sectionId}" + self.services.chat.progressLogStart( + sectionOperationId, + "Section Generation", + f"Section {sectionIndex + 1}/{totalSections}", + f"{sectionTitle} (single part)", + parentOperationId=chapterOperationId + ) + + try: + self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") + + self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") + + operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE + + if operationType == OperationTypeEnum.IMAGE_GENERATE: + maxPromptLength = 4000 + if len(generationPrompt) > maxPromptLength: + logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") + generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] + + # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping) + self.services.utils.writeDebugFile( + generationPrompt, + f"{chapterId}_section_{sectionId}_prompt" + ) + + request = AiCallRequest( + prompt=generationPrompt, + contentParts=[], + options=AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + ) + aiResponse = await self.aiService.callAi(request) + generatedElements = [] + + # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping) + self.services.utils.writeDebugFile( + aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse), + f"{chapterId}_section_{sectionId}_response" + ) + else: + isAggregation = False + + async def buildSectionPromptWithContinuation( + section: Dict[str, Any], + contentParts: List[ContentPart], + userPrompt: str, + generationHint: str, + allSections: List[Dict[str, Any]], + sectionIndex: int, + isAggregation: bool, + continuationContext: Dict[str, Any], + services: Any + ) -> str: + basePrompt = self._buildSectionGenerationPrompt( + section=section, + contentParts=contentParts, + userPrompt=userPrompt, + generationHint=generationHint, + allSections=allSections, + sectionIndex=sectionIndex, + isAggregation=isAggregation, + language=language + ) + + continuationInfo = continuationContext.get("delivered_summary", "") + cutOffElement = continuationContext.get("cut_off_element", "") + + continuationPrompt = f"""{basePrompt} + +--- CONTINUATION REQUEST --- +The previous JSON response was incomplete. Please continue from where it stopped. + +PREVIOUSLY DELIVERED SUMMARY: +{continuationInfo} + +LAST INCOMPLETE ELEMENT: +{cutOffElement} + +TASK: Continue generating the JSON elements array from where it was cut off. +Complete the incomplete element and continue with remaining elements. + +Return ONLY the continuation JSON (starting from the incomplete element). +The JSON should be a fragment that can be merged with the previous response.""" + return continuationPrompt + + options = AiCallOptions( + operationType=operationType, + priority=PriorityEnum.BALANCED, + processingMode=ProcessingModeEnum.DETAILED + ) + + aiResponseJson = await self.aiService._callAiWithLooping( + prompt=generationPrompt, + options=options, + debugPrefix=f"{chapterId}_section_{sectionId}", + promptBuilder=buildSectionPromptWithContinuation, + promptArgs={ + "section": section, + "contentParts": [part], + "userPrompt": userPrompt, + "generationHint": generationHint, + "allSections": all_sections_list, + "sectionIndex": sectionIndex, + "isAggregation": isAggregation, + "services": self.services + }, + operationId=sectionOperationId, + userPrompt=userPrompt, + contentParts=[part] + ) + + try: + parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson)) + if isinstance(parsedResponse, list): + generatedElements = parsedResponse + elif isinstance(parsedResponse, dict): + if "elements" in parsedResponse: + generatedElements = parsedResponse["elements"] + elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0: + firstSection = parsedResponse["sections"][0] + generatedElements = firstSection.get("elements", []) + elif parsedResponse.get("type"): + generatedElements = [parsedResponse] + else: + generatedElements = [] + else: + generatedElements = [] + + class AiResponse: + def __init__(self, content): + self.content = content + + aiResponse = AiResponse(aiResponseJson) + except Exception as parseError: + logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}") + class AiResponse: + def __init__(self, content): + self.content = content + aiResponse = AiResponse(aiResponseJson) + generatedElements = [] + + self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") + # Note: Debug files are written by _callAiWithLooping using debugPrefix + + self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") + + responseElements = await self._processAiResponseForSection( + aiResponse=aiResponse, + contentType=contentType, + operationType=operationType, + sectionId=sectionId, + generationHint=generationHint, + generatedElements=generatedElements + ) + elements.extend(responseElements) + + self.services.chat.progressLogFinish(sectionOperationId, True) + + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + self.services.chat.progressLogFinish(sectionOperationId, False) + elements.append({ + "type": "error", + "message": f"Error generating section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + logger.error(f"Error generating section {sectionId}: {str(e)}") + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" + ) + else: + # Füge extrahierten Content direkt hinzu (kein AI-Call) + if part.typeGroup == "image": + logger.debug(f"Processing section {sectionId}: Single extracted IMAGE part WITHOUT AI call") + elements.append({ + "type": "image", + "content": { + "base64Data": part.data, + "altText": part.metadata.get("usageHint", part.label), + "caption": part.metadata.get("caption", "") + } + }) + else: + logger.debug(f"Processing section {sectionId}: Single extracted TEXT part WITHOUT AI call") + elements.append({ + "type": "extracted_text", + "content": part.data, + "source": part.metadata.get("documentId"), + "extractionPrompt": part.metadata.get("extractionPrompt") + }) + + # Update progress after section completion + chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 + self.services.chat.progressLogUpdate( + chapterOperationId, + chapterProgress, + f"Section {sectionIndex + 1}/{totalSections} completed" + ) + + overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex + 1, totalSections) + self.services.chat.progressLogUpdate( + fillOperationId, + overallProgress, + f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections} completed" + ) + + except Exception as e: + logger.error(f"Unexpected error processing section {sectionId}: {str(e)}") + elements.append({ + "type": "error", + "message": f"Unexpected error processing section {sectionId}: {str(e)}", + "sectionId": sectionId + }) + + return elements + async def _fillChapterSections( self, chapterStructure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, - parentOperationId: str + parentOperationId: str, + language: str ) -> Dict[str, Any]: """ Phase 5D.2: Füllt Sections mit ContentParts. """ + # Sammle alle Sections für Kontext-Informationen (für alle Sections) all_sections_list = [] for doc in chapterStructure.get("documents", []): @@ -252,736 +1251,48 @@ class StructureFiller: parentOperationId=fillOperationId ) - # Process sections within chapter + # Process sections within chapter in parallel + sectionTasks = [] for sectionIndex, section in enumerate(sections): - sectionId = section.get("id") - sectionTitle = section.get("title", sectionId) - contentPartIds = section.get("contentPartIds", []) - contentFormats = section.get("contentFormats", {}) - # Check both camelCase and snake_case for generationHint - generationHint = section.get("generationHint") or section.get("generation_hint") - contentType = section.get("content_type", "paragraph") - useAiCall = section.get("useAiCall", False) - - # Update overall progress at start of section - overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex, totalSections) - self.services.chat.progressLogUpdate( - fillOperationId, - overallProgress, - f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections}: {sectionTitle}" + # Create task for parallel processing + task = self._processSingleSection( + section=section, + sectionIndex=sectionIndex, + totalSections=totalSections, + chapterIndex=chapterIndex, + totalChapters=totalChapters, + chapterId=chapterId, + chapterOperationId=chapterOperationId, + fillOperationId=fillOperationId, + contentParts=contentParts, + userPrompt=userPrompt, + all_sections_list=all_sections_list, + language=language, + calculateOverallProgress=calculateOverallProgress ) + sectionTasks.append((sectionIndex, section, task)) + + # Execute all section tasks in parallel + if sectionTasks: + # Create list of tasks (without indices for gather) + tasks = [task for _, _, task in sectionTasks] - # WICHTIG: Wenn keine ContentParts vorhanden sind UND kein generationHint, kann kein AI-Call gemacht werden - # Aber: Wenn generationHint vorhanden ist, SOLLTE AI verwendet werden, auch wenn useAiCall=false gesetzt ist - # (z.B. wenn AI die Struktur generiert hat, aber useAiCall falsch gesetzt wurde) - if len(contentPartIds) == 0 and not generationHint: - useAiCall = False - logger.debug(f"Section {sectionId}: No content parts and no generation hint, setting useAiCall=False") - elif len(contentPartIds) == 0 and generationHint and not useAiCall: - # Override: If there's a generationHint but no content parts, we should use AI - # This handles cases where structure generation set useAiCall=false incorrectly - useAiCall = True - logger.info(f"Section {sectionId}: Overriding useAiCall=True (has generationHint but no content parts)") + # Execute in parallel with error handling + results = await asyncio.gather(*tasks, return_exceptions=True) - elements = [] - - # Prüfe ob Aggregation nötig ist - needsAggregation = self._needsAggregation( - contentType=contentType, - contentPartCount=len(contentPartIds) - ) - - logger.info(f"Processing section {sectionId}: contentType={contentType}, contentPartCount={len(contentPartIds)}, useAiCall={useAiCall}, needsAggregation={needsAggregation}, hasGenerationHint={bool(generationHint)}") - - if needsAggregation and useAiCall: - # Aggregation: Alle Parts zusammen verarbeiten - sectionParts = [ - self._findContentPartById(pid, contentParts) - for pid in contentPartIds - ] - sectionParts = [p for p in sectionParts if p is not None] - - if sectionParts: - # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt) - extractedParts = [ - p for p in sectionParts - if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted" - ] - nonExtractedParts = [ - p for p in sectionParts - if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted" - ] - - # Verarbeite non-extracted Parts separat (reference, object) - for part in nonExtractedParts: - contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat")) - - if contentFormat == "reference": - elements.append({ - "type": "reference", - "documentReference": part.metadata.get("documentReference"), - "label": part.metadata.get("usageHint", part.label) - }) - elif contentFormat == "object": - # Nested content structure for objects - if part.typeGroup == "image": - elements.append({ - "type": "image", - "content": { - "base64Data": part.data, - "altText": part.metadata.get("usageHint", part.label), - "caption": part.metadata.get("caption", "") - } - }) - else: - elements.append({ - "type": part.typeGroup, - "content": { - "data": part.data, - "mimeType": part.mimeType, - "label": part.metadata.get("usageHint", part.label) - } - }) - - # Aggregiere extracted Parts mit AI - if extractedParts: - logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI") - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=extractedParts, # ALLE PARTS für Aggregation! - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=True - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId) - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation (Aggregation)", - f"Section {sectionIndex + 1}/{totalSections}", - f"{sectionTitle} ({len(extractedParts)} parts)", - parentOperationId=chapterOperationId - ) - - try: - # Update: Building prompt - self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") - - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"{chapterId}_section_{sectionId}_prompt" - ) - logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)") - - # Update: Calling AI - self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - - # Verwende callAi für ContentParts-Unterstützung (nicht callAiPlanning!) - # Use IMAGE_GENERATE for image content type - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE - - # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit) - if operationType == OperationTypeEnum.IMAGE_GENERATE: - maxPromptLength = 4000 - if len(generationPrompt) > maxPromptLength: - logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") - # Keep the beginning (task, metadata, generation hint) and truncate from end - generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline - - # For IMAGE_GENERATE, don't pass contentParts - image generation uses prompt only, not content chunks - contentPartsForCall = [] if operationType == OperationTypeEnum.IMAGE_GENERATE else extractedParts - request = AiCallRequest( - prompt=generationPrompt, - contentParts=contentPartsForCall, # Empty for IMAGE_GENERATE, all parts for others - options=AiCallOptions( - operationType=operationType, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.aiService.callAi(request) - - # Update: Processing response - self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"{chapterId}_section_{sectionId}_response" - ) - logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)") - - # Update: Validating content - self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") - - # Handle IMAGE_GENERATE differently - returns image data directly - if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: - import base64 - base64Data = "" - - # Convert image data to base64 string if needed - if isinstance(aiResponse.content, bytes): - base64Data = base64.b64encode(aiResponse.content).decode('utf-8') - elif isinstance(aiResponse.content, str): - # Check if it's already a JSON structure - try: - # Try to parse as JSON first - jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) - # If it's already a proper JSON structure with image element, use it - if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": - elements.append(jsonContent) - logger.debug("AI returned proper JSON image structure") - continue - elif isinstance(jsonContent, list) and len(jsonContent) > 0: - # Check if first element is an image - if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": - elements.extend(jsonContent) - logger.debug("AI returned proper JSON image structure in list") - continue - except (json.JSONDecodeError, ValueError, AttributeError): - # Not JSON, treat as base64 string or data URI - pass - - # Already base64 string or data URI - if aiResponse.content.startswith("data:image/"): - # Extract base64 from data URI - base64Data = aiResponse.content.split(",", 1)[1] - else: - # Check if it looks like base64 (alphanumeric + / + =) - content_stripped = aiResponse.content.strip() - if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): - # Looks like base64, use it - base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") - else: - base64Data = aiResponse.content - else: - base64Data = "" - - # Always create proper JSON structure for images - if base64Data: - elements.append({ - "type": "image", - "content": { - "base64Data": base64Data, - "altText": generationHint or "Generated image", - "caption": "" - } - }) - logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") - else: - logger.warning(f"IMAGE_GENERATE returned empty or invalid content for section {sectionId}") - elements.append({ - "type": "error", - "message": f"Image generation returned empty or invalid content", - "sectionId": sectionId - }) - else: - # Parse JSON response for other content types - try: - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - elif isinstance(generatedElements, dict) and generatedElements.get("type"): - # Single element in dict format - elements.append(generatedElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - # Try to extract any image data that might be in the response - if contentType == "image": - # Check if response content might be base64 image data - content_str = str(aiResponse.content) - if len(content_str) > 100: - elements.append({ - "type": "error", - "message": f"Failed to parse image generation response: {str(json_error)}", - "sectionId": sectionId - }) - else: - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - # Update chapter progress after section completion - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # Still update chapter progress even on error - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" - ) - # NICHT raise - Section wird mit Fehlermeldung gerendert - - else: - # Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts - # Handle case where no content parts but generationHint exists (e.g., Executive Summary) - if len(contentPartIds) == 0 and useAiCall and generationHint: - # Generate content from scratch using only generationHint - logger.debug(f"Processing section {sectionId}: No content parts, generating from generationHint only") - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=[], # NO PARTS - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=False - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId) - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation", - f"Section {sectionIndex + 1}/{totalSections}", - f"{sectionTitle} (from generationHint)", - parentOperationId=chapterOperationId - ) - - try: - # Update: Building prompt - self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") - - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"{chapterId}_section_{sectionId}_prompt" - ) - logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt") - - # Update: Calling AI - self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - - # Verwende callAi ohne ContentParts - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE - - # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit) - if operationType == OperationTypeEnum.IMAGE_GENERATE: - maxPromptLength = 4000 - if len(generationPrompt) > maxPromptLength: - logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") - # Keep the beginning (task, metadata, generation hint) and truncate from end - generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline - - request = AiCallRequest( - prompt=generationPrompt, - contentParts=[], # NO PARTS - options=AiCallOptions( - operationType=operationType, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.aiService.callAi(request) - - # Update: Processing response - self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"{chapterId}_section_{sectionId}_response" - ) - logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response") - - # Update: Validating content - self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") - - # Handle IMAGE_GENERATE differently - returns image data directly - if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: - import base64 - base64Data = "" - - # Convert image data to base64 string if needed - if isinstance(aiResponse.content, bytes): - base64Data = base64.b64encode(aiResponse.content).decode('utf-8') - elif isinstance(aiResponse.content, str): - # Check if it's already a JSON structure - try: - jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) - if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": - elements.append(jsonContent) - logger.debug("AI returned proper JSON image structure") - continue - elif isinstance(jsonContent, list) and len(jsonContent) > 0: - if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": - elements.extend(jsonContent) - logger.debug("AI returned proper JSON image structure in list") - continue - except (json.JSONDecodeError, ValueError, AttributeError): - pass - - # Already base64 string or data URI - if aiResponse.content.startswith("data:image/"): - base64Data = aiResponse.content.split(",", 1)[1] - else: - content_stripped = aiResponse.content.strip() - if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): - base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") - else: - base64Data = aiResponse.content - else: - base64Data = "" - - # Always create proper JSON structure for images - if base64Data: - elements.append({ - "type": "image", - "content": { - "base64Data": base64Data, - "altText": generationHint or "Generated image", - "caption": "" - } - }) - logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") - else: - logger.warning(f"IMAGE_GENERATE returned empty content for section {sectionId}") - elements.append({ - "type": "error", - "message": f"Image generation returned empty content", - "sectionId": sectionId - }) - else: - # Parse JSON response for other content types - try: - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - elif isinstance(generatedElements, dict) and generatedElements.get("type"): - elements.append(generatedElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - # Update chapter progress after section completion - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # Still update chapter progress even on error - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" - ) - - # Einzelverarbeitung: Jeder Part einzeln - for partId in contentPartIds: - part = self._findContentPartById(partId, contentParts) - if not part: - continue - - contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat")) - - if contentFormat == "reference": - # Füge Dokument-Referenz hinzu - elements.append({ - "type": "reference", - "documentReference": part.metadata.get("documentReference"), - "label": part.metadata.get("usageHint", part.label) - }) - - elif contentFormat == "object": - # Füge base64 Object hinzu (nested in content structure) - if part.typeGroup == "image": - elements.append({ - "type": "image", - "content": { - "base64Data": part.data, - "altText": part.metadata.get("usageHint", part.label), - "caption": part.metadata.get("caption", "") - } - }) - else: - # For other object types, use generic structure - elements.append({ - "type": part.typeGroup, - "content": { - "data": part.data, - "mimeType": part.mimeType, - "label": part.metadata.get("usageHint", part.label) - } - }) - - elif contentFormat == "extracted": - # WICHTIG: Prüfe sowohl useAiCall als auch generationHint - if useAiCall and generationHint: - # AI-Call mit einzelnen ContentPart - logger.debug(f"Processing section {sectionId}: Single extracted part with AI call (useAiCall={useAiCall}, generationHint={bool(generationHint)})") - generationPrompt = self._buildSectionGenerationPrompt( - section=section, - contentParts=[part], # EIN PART - userPrompt=userPrompt, - generationHint=generationHint, - allSections=all_sections_list, - sectionIndex=sectionIndex, - isAggregation=False - ) - - # Erstelle Operation-ID für Section-Generierung - sectionOperationId = f"{fillOperationId}_section_{sectionId}" - - # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId) - self.services.chat.progressLogStart( - sectionOperationId, - "Section Generation", - f"Section {sectionIndex + 1}/{totalSections}", - f"{sectionTitle} (single part)", - parentOperationId=chapterOperationId - ) - - try: - # Update: Building prompt - self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt") - - # Debug: Log Prompt - self.services.utils.writeDebugFile( - generationPrompt, - f"{chapterId}_section_{sectionId}_prompt" - ) - logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt") - - # Update: Calling AI - self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation") - - # Verwende callAi für ContentParts-Unterstützung - # Use IMAGE_GENERATE for image content type - operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE - - # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit) - if operationType == OperationTypeEnum.IMAGE_GENERATE: - maxPromptLength = 4000 - if len(generationPrompt) > maxPromptLength: - logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters") - # Keep the beginning (task, metadata, generation hint) and truncate from end - generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline - - # For IMAGE_GENERATE, don't pass contentParts - image generation uses prompt only, not content chunks - contentPartsForCall = [] if operationType == OperationTypeEnum.IMAGE_GENERATE else [part] - request = AiCallRequest( - prompt=generationPrompt, - contentParts=contentPartsForCall, - options=AiCallOptions( - operationType=operationType, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.DETAILED - ) - ) - aiResponse = await self.aiService.callAi(request) - - # Update: Processing response - self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response") - - # Debug: Log Response - self.services.utils.writeDebugFile( - aiResponse.content, - f"{chapterId}_section_{sectionId}_response" - ) - logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response") - - # Update: Validating content - self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content") - - # Handle IMAGE_GENERATE differently - returns image data directly - if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE: - import base64 - base64Data = "" - - # Convert image data to base64 string if needed - if isinstance(aiResponse.content, bytes): - base64Data = base64.b64encode(aiResponse.content).decode('utf-8') - elif isinstance(aiResponse.content, str): - # Check if it's already a JSON structure - try: - jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content)) - if isinstance(jsonContent, dict) and jsonContent.get("type") == "image": - elements.append(jsonContent) - logger.debug("AI returned proper JSON image structure") - continue - elif isinstance(jsonContent, list) and len(jsonContent) > 0: - if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image": - elements.extend(jsonContent) - logger.debug("AI returned proper JSON image structure in list") - continue - except (json.JSONDecodeError, ValueError, AttributeError): - pass - - # Already base64 string or data URI - if aiResponse.content.startswith("data:image/"): - base64Data = aiResponse.content.split(",", 1)[1] - else: - content_stripped = aiResponse.content.strip() - if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]): - base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "") - else: - base64Data = aiResponse.content - else: - base64Data = "" - - # Always create proper JSON structure for images - if base64Data: - elements.append({ - "type": "image", - "content": { - "base64Data": base64Data, - "altText": generationHint or "Generated image", - "caption": "" - } - }) - logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}") - else: - logger.warning(f"IMAGE_GENERATE returned empty content for section {sectionId}") - elements.append({ - "type": "error", - "message": f"Image generation returned empty content", - "sectionId": sectionId - }) - else: - # Parse JSON response for other content types - try: - generatedElements = json.loads( - self.services.utils.jsonExtractString(aiResponse.content) - ) - if isinstance(generatedElements, list): - elements.extend(generatedElements) - elif isinstance(generatedElements, dict) and "elements" in generatedElements: - elements.extend(generatedElements["elements"]) - elif isinstance(generatedElements, dict) and generatedElements.get("type"): - elements.append(generatedElements) - except (json.JSONDecodeError, ValueError) as json_error: - logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}") - elements.append({ - "type": "error", - "message": f"Failed to parse JSON response: {str(json_error)}", - "sectionId": sectionId - }) - - # ChatLog abschließen - self.services.chat.progressLogFinish(sectionOperationId, True) - - # Update chapter progress after section completion - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - except Exception as e: - # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!) - self.services.chat.progressLogFinish(sectionOperationId, False) - elements.append({ - "type": "error", - "message": f"Error generating section {sectionId}: {str(e)}", - "sectionId": sectionId - }) - logger.error(f"Error generating section {sectionId}: {str(e)}") - # Still update chapter progress even on error - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed (with errors)" - ) - # NICHT raise - Section wird mit Fehlermeldung gerendert - else: - # Füge extrahierten Content direkt hinzu (kein AI-Call) - # CRITICAL: Check part typeGroup to determine correct element type - if part.typeGroup == "image": - # Image content should be added as image element, not extracted_text - logger.debug(f"Processing section {sectionId}: Single extracted IMAGE part WITHOUT AI call - adding as image element") - elements.append({ - "type": "image", - "content": { - "base64Data": part.data, - "altText": part.metadata.get("usageHint", part.label), - "caption": part.metadata.get("caption", "") - } - }) - else: - # Text content - add as extracted_text element - logger.debug(f"Processing section {sectionId}: Single extracted TEXT part WITHOUT AI call (useAiCall={useAiCall}, generationHint={bool(generationHint)}) - adding extracted text directly") - elements.append({ - "type": "extracted_text", - "content": part.data, - "source": part.metadata.get("documentId"), - "extractionPrompt": part.metadata.get("extractionPrompt") - }) - - # Assign elements to section (for all processing paths) - section["elements"] = elements - - # Update chapter progress after section completion (for all sections, including non-AI) - chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0 - self.services.chat.progressLogUpdate( - chapterOperationId, - chapterProgress, - f"Section {sectionIndex + 1}/{totalSections} completed" - ) - - # Update overall progress after section completion - overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex + 1, totalSections) - self.services.chat.progressLogUpdate( - fillOperationId, - overallProgress, - f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections} completed" - ) + # Process results in order and assign elements to sections + for (originalIndex, originalSection, _), result in zip(sectionTasks, results): + if isinstance(result, Exception): + logger.error(f"Error processing section {originalSection.get('id')}: {str(result)}") + # Set error element + originalSection["elements"] = [{ + "type": "error", + "message": f"Error processing section: {str(result)}", + "sectionId": originalSection.get("id") + }] + else: + # Assign elements to section in correct order + originalSection["elements"] = result # Finish chapter operation after all sections processed self.services.chat.progressLogFinish(chapterOperationId, True) @@ -1121,7 +1432,8 @@ class StructureFiller: contentPartIds: List[str], contentPartInstructions: Dict[str, Any], contentParts: List[ContentPart], - userPrompt: str + userPrompt: str, + language: str = "en" ) -> str: """Baue Prompt für Chapter-Sections-Struktur-Generierung.""" # Baue ContentParts-Index (nur IDs, keine Previews!) @@ -1144,6 +1456,8 @@ class StructureFiller: prompt = f"""TASK: Generate Chapter Sections Structure +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + CHAPTER: {chapterTitle} (Level {chapterLevel}, ID: {chapterId}) GENERATION HINT: {generationHint} @@ -1213,7 +1527,8 @@ CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside th generationHint: str, allSections: Optional[List[Dict[str, Any]]] = None, sectionIndex: Optional[int] = None, - isAggregation: bool = False + isAggregation: bool = False, + language: str = "en" ) -> str: """Baue Prompt für Section-Generierung mit vollständigem Kontext.""" # Filtere None-Werte @@ -1312,6 +1627,8 @@ CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside th if isAggregation: prompt = f"""# TASK: Generate Section Content (Aggregation) +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + ## SECTION METADATA - Section ID: {sectionId} - Content Type: {contentType} @@ -1360,6 +1677,8 @@ CRITICAL: else: prompt = f"""# TASK: Generate Section Content +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + ## SECTION METADATA - Section ID: {sectionId} - Content Type: {contentType} diff --git a/modules/services/serviceAi/subStructureGeneration.py b/modules/services/serviceAi/subStructureGeneration.py index d3b46e0e..bee83706 100644 --- a/modules/services/serviceAi/subStructureGeneration.py +++ b/modules/services/serviceAi/subStructureGeneration.py @@ -24,6 +24,20 @@ class StructureGenerator: self.services = services self.aiService = aiService + def _getUserLanguage(self) -> str: + """Get user language for document generation""" + try: + if self.services: + # Prefer detected language if available (from user intention analysis) + if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage: + return self.services.currentUserLanguage + # Fallback to user's preferred language + elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'): + return self.services.user.language + except Exception: + pass + return 'en' # Default fallback + async def generateStructure( self, userPrompt: str, @@ -76,7 +90,30 @@ class StructureGenerator: ) # Parse Struktur - structure = json.loads(self.services.utils.jsonExtractString(aiResponse)) + # Use tryParseJson which handles malformed JSON and unterminated strings + extractedJson = self.services.utils.jsonExtractString(aiResponse) + parsedJson, parseError, cleanedJson = self.services.utils.jsonTryParse(extractedJson) + + if parseError is not None: + # Try to repair broken JSON (handles unterminated strings, incomplete structures, etc.) + logger.warning(f"Initial JSON parsing failed: {str(parseError)}. Attempting repair...") + from modules.shared import jsonUtils + repairedJson = jsonUtils.repairBrokenJson(extractedJson) + if repairedJson: + # Try parsing repaired JSON + parsedJson, parseError, _ = self.services.utils.jsonTryParse(json.dumps(repairedJson)) + if parseError is None: + logger.info("Successfully repaired and parsed JSON structure") + structure = parsedJson + else: + logger.error(f"Failed to parse repaired JSON: {str(parseError)}") + raise ValueError(f"Failed to parse JSON structure after repair: {str(parseError)}") + else: + logger.error(f"Failed to repair JSON. Parse error: {str(parseError)}") + logger.error(f"Cleaned JSON preview (first 500 chars): {cleanedJson[:500]}") + raise ValueError(f"Failed to parse JSON structure: {str(parseError)}") + else: + structure = parsedJson # ChatLog abschließen self.services.chat.progressLogFinish(structureOperationId, True) @@ -145,11 +182,17 @@ class StructureGenerator: if not contentPartsIndex: contentPartsIndex = "\n(No content parts available)" + # Get language from services (user intention analysis) + language = self._getUserLanguage() + logger.debug(f"Using language from services (user intention analysis) for structure generation: {language}") + prompt = f"""USER REQUEST (for context): ``` {userPrompt} ``` +LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}. + AVAILABLE CONTENT PARTS: {contentPartsIndex} @@ -199,7 +242,7 @@ RETURN JSON: {{ "metadata": {{ "title": "Document Title", - "language": "de" + "language": "{language}" }}, "documents": [{{ "id": "doc_1", diff --git a/modules/services/serviceGeneration/renderers/rendererHtml.py b/modules/services/serviceGeneration/renderers/rendererHtml.py index 47fecffa..1f013e50 100644 --- a/modules/services/serviceGeneration/renderers/rendererHtml.py +++ b/modules/services/serviceGeneration/renderers/rendererHtml.py @@ -417,7 +417,10 @@ class RendererHtml(BaseRenderer): if htmlParts: return '\n'.join(htmlParts) - return self._renderJsonParagraph(sectionData, styles) + # If sectionData is not a list, treat it as a dict + if isinstance(sectionData, dict): + return self._renderJsonParagraph(sectionData, styles) + return "" elif sectionType == "code_block": # Work directly with elements like other renderers if isinstance(sectionData, list) and sectionData: @@ -451,7 +454,9 @@ class RendererHtml(BaseRenderer): if htmlParts: return '\n'.join(htmlParts) # Fallback to paragraph for unknown types - return self._renderJsonParagraph(sectionData, styles) + if isinstance(sectionData, dict): + return self._renderJsonParagraph(sectionData, styles) + return "" except Exception as e: self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}") diff --git a/modules/services/serviceGeneration/renderers/rendererMarkdown.py b/modules/services/serviceGeneration/renderers/rendererMarkdown.py index 4b372bb2..84644485 100644 --- a/modules/services/serviceGeneration/renderers/rendererMarkdown.py +++ b/modules/services/serviceGeneration/renderers/rendererMarkdown.py @@ -162,7 +162,13 @@ class RendererMarkdown(BaseRenderer): return self._renderJsonHeading(element) return "" elif sectionType == "paragraph": - return self._renderJsonParagraph(sectionData) + # Work directly with elements like other renderers + if isinstance(sectionData, list) and sectionData: + element = sectionData[0] if isinstance(sectionData[0], dict) else {} + return self._renderJsonParagraph(element) + elif isinstance(sectionData, dict): + return self._renderJsonParagraph(sectionData) + return "" elif sectionType == "code_block": # Work directly with elements like other renderers if isinstance(sectionData, list) and sectionData: @@ -177,7 +183,12 @@ class RendererMarkdown(BaseRenderer): return "" else: # Fallback to paragraph for unknown types - return self._renderJsonParagraph(sectionData) + if isinstance(sectionData, list) and sectionData: + element = sectionData[0] if isinstance(sectionData[0], dict) else {} + return self._renderJsonParagraph(element) + elif isinstance(sectionData, dict): + return self._renderJsonParagraph(sectionData) + return "" except Exception as e: self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}") diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py index 18176a92..50f7a84c 100644 --- a/modules/services/serviceWeb/mainServiceWeb.py +++ b/modules/services/serviceWeb/mainServiceWeb.py @@ -8,6 +8,8 @@ Manages the two-step process: WEB_SEARCH then WEB_CRAWL. import json import logging import time +import asyncio +from urllib.parse import urlparse from typing import Dict, Any, List, Optional from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl @@ -99,12 +101,18 @@ class WebService: self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") - # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering) - if len(allUrls) > maxNumberPages: - allUrls = allUrls[:maxNumberPages] + # Step 3: Validate and filter URLs before crawling + validatedUrls = self._validateUrls(allUrls) + if not validatedUrls: + logger.warning(f"All {len(allUrls)} URLs failed validation") + return {"error": "No valid URLs found to crawl"} + + # Filter to maxNumberPages (simple cut, no intelligent filtering) + if len(validatedUrls) > maxNumberPages: + validatedUrls = validatedUrls[:maxNumberPages] logger.info(f"Limited URLs to {maxNumberPages}") - if not allUrls: + if not validatedUrls: return {"error": "No URLs found to crawl"} # Step 4: Translate researchDepth to maxDepth @@ -114,14 +122,14 @@ class WebService: # Step 5: Crawl all URLs with hierarchical logging if operationId: self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") - self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") + self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs") # Use parent operation ID directly (parentId should be operationId, not log entry ID) parentOperationId = operationId # Use the parent's operationId directly crawlResult = await self._performWebCrawl( instruction=instruction, - urls=allUrls, + urls=validatedUrls, maxDepth=maxDepth, parentOperationId=parentOperationId ) @@ -194,24 +202,24 @@ class WebService: "max_depth": maxDepth, "country": countryCode, "language": languageCode, - "urls_crawled": allUrls[:20], # First 20 URLs for reference - "total_urls": len(allUrls), + "urls_crawled": validatedUrls[:20], # First 20 URLs for reference + "total_urls": len(validatedUrls), "urls_with_content": urlsWithContent, "total_content_length": totalContentLength, "crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None }, "sections": sections, "statistics": { - "sectionCount": len(sections), - "total_urls": len(allUrls), + "sectionCount": len(sections), + "total_urls": len(validatedUrls), "results_count": totalResults, "urls_with_content": urlsWithContent, "total_content_length": totalContentLength }, # Keep original structure for backward compatibility "instruction": instruction, - "urls_crawled": allUrls, - "total_urls": len(allUrls), + "urls_crawled": validatedUrls, + "total_urls": len(validatedUrls), "results": crawlResult, "total_results": totalResults } @@ -383,6 +391,50 @@ Return ONLY valid JSON, no additional text: logger.error(f"Error in web search: {str(e)}") return [] + def _validateUrls(self, urls: List[str]) -> List[str]: + """ + Validate URLs before crawling - filters out invalid URLs. + + Args: + urls: List of URLs to validate + + Returns: + List of valid URLs + """ + validatedUrls = [] + for url in urls: + if not url or not isinstance(url, str): + logger.debug(f"Skipping invalid URL (not a string): {url}") + continue + + url = url.strip() + if not url: + logger.debug(f"Skipping empty URL") + continue + + # Basic URL validation using urlparse + try: + parsed = urlparse(url) + # Check if URL has at least scheme and netloc + if not parsed.scheme or not parsed.netloc: + logger.debug(f"Skipping invalid URL (missing scheme or netloc): {url}") + continue + + # Only allow http/https schemes + if parsed.scheme not in ['http', 'https']: + logger.debug(f"Skipping URL with unsupported scheme '{parsed.scheme}': {url}") + continue + + validatedUrls.append(url) + logger.debug(f"Validated URL: {url}") + + except Exception as e: + logger.warning(f"Error validating URL '{url}': {str(e)}") + continue + + logger.info(f"Validated {len(validatedUrls)}/{len(urls)} URLs") + return validatedUrls + async def _performWebCrawl( self, instruction: str, @@ -390,117 +442,165 @@ Return ONLY valid JSON, no additional text: maxDepth: int = 2, parentOperationId: Optional[str] = None ) -> List[Dict[str, Any]]: - """Perform web crawl on list of URLs - calls plugin for each URL individually.""" - crawlResults = [] - - # Loop over each URL and crawl one at a time + """Perform web crawl on list of URLs - crawls URLs in parallel for better performance.""" + # Create tasks for parallel crawling + crawlTasks = [] for urlIndex, url in enumerate(urls): - # Create separate operation for each URL with parent reference - urlOperationId = None - if parentOperationId: - workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" - urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" - self.services.chat.progressLogStart( - urlOperationId, - "Web Crawl", - f"URL {urlIndex + 1}", - url[:50] + "..." if len(url) > 50 else url, - parentOperationId=parentOperationId - ) - - try: - logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}") - - if urlOperationId: - displayUrl = url[:50] + "..." if len(url) > 50 else url - self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}") - self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl") - - # Build crawl prompt model for single URL - crawlPromptModel = AiCallPromptWebCrawl( - instruction=instruction, - url=url, # Single URL - maxDepth=maxDepth, - maxWidth=5 # Default: 5 pages per level - ) - crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) - - # Debug: persist crawl prompt (with URL identifier in content for clarity) - debugPrompt = f"URL: {url}\n\n{crawlPrompt}" - self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt") - - # Call AI with WEB_CRAWL operation - crawlOptions = AiCallOptions( - operationType=OperationTypeEnum.WEB_CRAWL, - resultFormat="json" - ) - - if urlOperationId: - self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector") - - # Use unified callAiContent method with parentOperationId for hierarchical logging - crawlResponse = await self.services.ai.callAiContent( - prompt=crawlPrompt, - options=crawlOptions, - outputFormat="json", - parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging - ) - - if urlOperationId: - self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results") - - # Extract content from AiResponse - crawlResult = crawlResponse.content - - # Debug: persist crawl response - if isinstance(crawlResult, str): - self.services.utils.writeDebugFile(crawlResult, "webcrawl_response") - else: - self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response") - - # Parse crawl result - if isinstance(crawlResult, str): - try: - # Extract JSON from response (handles markdown code blocks) - extractedJson = self.services.utils.jsonExtractString(crawlResult) - crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult) - except: - crawlData = {"url": url, "content": crawlResult} - else: - crawlData = crawlResult - - # Process crawl results and create hierarchical progress logging for sub-URLs - if urlOperationId: - self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results") - - # Recursively process crawl results to find nested URLs and create child operations - processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0) - - # Count total URLs crawled (including sub-URLs) for progress message - totalUrlsCrawled = self._countUrlsInResults(processedResults) - - # Ensure it's a list of results - if isinstance(processedResults, list): - crawlResults.extend(processedResults) - elif isinstance(processedResults, dict): - crawlResults.append(processedResults) - else: - crawlResults.append({"url": url, "content": str(processedResults)}) - - if urlOperationId: - if totalUrlsCrawled > 1: - self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)") - else: - self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed") - self.services.chat.progressLogFinish(urlOperationId, True) - - except Exception as e: - logger.error(f"Error crawling URL {url}: {str(e)}") - if urlOperationId: - self.services.chat.progressLogFinish(urlOperationId, False) - crawlResults.append({"url": url, "error": str(e)}) + task = self._crawlSingleUrl( + url=url, + urlIndex=urlIndex, + totalUrls=len(urls), + instruction=instruction, + maxDepth=maxDepth, + parentOperationId=parentOperationId + ) + crawlTasks.append(task) - return crawlResults + # Execute all crawl tasks in parallel + logger.info(f"Starting parallel crawl of {len(urls)} URLs") + crawlResults = await asyncio.gather(*crawlTasks, return_exceptions=True) + + # Process results and handle exceptions + processedResults = [] + for idx, result in enumerate(crawlResults): + if isinstance(result, Exception): + logger.error(f"Error crawling URL {urls[idx]}: {str(result)}") + processedResults.append({"url": urls[idx], "error": str(result)}) + else: + processedResults.extend(result if isinstance(result, list) else [result]) + + logger.info(f"Completed parallel crawl: {len(processedResults)} results") + return processedResults + + async def _crawlSingleUrl( + self, + url: str, + urlIndex: int, + totalUrls: int, + instruction: str, + maxDepth: int, + parentOperationId: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + Crawl a single URL - called in parallel for multiple URLs. + + Args: + url: URL to crawl + urlIndex: Index of URL in the list + totalUrls: Total number of URLs being crawled + instruction: Research instruction + maxDepth: Maximum crawl depth + parentOperationId: Parent operation ID for progress tracking + + Returns: + List of crawl results for this URL + """ + # Create separate operation for each URL with parent reference + urlOperationId = None + if parentOperationId: + workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" + urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" + self.services.chat.progressLogStart( + urlOperationId, + "Web Crawl", + f"URL {urlIndex + 1}/{totalUrls}", + url[:50] + "..." if len(url) > 50 else url, + parentOperationId=parentOperationId + ) + + try: + logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls}: {url}") + + if urlOperationId: + displayUrl = url[:50] + "..." if len(url) > 50 else url + self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}") + self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl") + + # Build crawl prompt model for single URL + crawlPromptModel = AiCallPromptWebCrawl( + instruction=instruction, + url=url, # Single URL + maxDepth=maxDepth, + maxWidth=5 # Default: 5 pages per level + ) + crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) + + # Debug: persist crawl prompt (with URL identifier in content for clarity) + debugPrompt = f"URL: {url}\n\n{crawlPrompt}" + self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt") + + # Call AI with WEB_CRAWL operation + crawlOptions = AiCallOptions( + operationType=OperationTypeEnum.WEB_CRAWL, + resultFormat="json" + ) + + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector") + + # Use unified callAiContent method with parentOperationId for hierarchical logging + crawlResponse = await self.services.ai.callAiContent( + prompt=crawlPrompt, + options=crawlOptions, + outputFormat="json", + parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging + ) + + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results") + + # Extract content from AiResponse + crawlResult = crawlResponse.content + + # Debug: persist crawl response + if isinstance(crawlResult, str): + self.services.utils.writeDebugFile(crawlResult, "webcrawl_response") + else: + self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response") + + # Parse crawl result + if isinstance(crawlResult, str): + try: + # Extract JSON from response (handles markdown code blocks) + extractedJson = self.services.utils.jsonExtractString(crawlResult) + crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult) + except: + crawlData = {"url": url, "content": crawlResult} + else: + crawlData = crawlResult + + # Process crawl results and create hierarchical progress logging for sub-URLs + if urlOperationId: + self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results") + + # Recursively process crawl results to find nested URLs and create child operations + processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0) + + # Count total URLs crawled (including sub-URLs) for progress message + totalUrlsCrawled = self._countUrlsInResults(processedResults) + + # Ensure it's a list of results + if isinstance(processedResults, list): + results = processedResults + elif isinstance(processedResults, dict): + results = [processedResults] + else: + results = [{"url": url, "content": str(processedResults)}] + + if urlOperationId: + if totalUrlsCrawled > 1: + self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)") + else: + self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed") + self.services.chat.progressLogFinish(urlOperationId, True) + + return results + + except Exception as e: + logger.error(f"Error crawling URL {url}: {str(e)}") + if urlOperationId: + self.services.chat.progressLogFinish(urlOperationId, False) + return [{"url": url, "error": str(e)}] def _processCrawlResultsWithHierarchy( self, diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py index f7dcecac..bd8b9779 100644 --- a/modules/shared/progressLogger.py +++ b/modules/shared/progressLogger.py @@ -132,13 +132,17 @@ class ProgressLogger: return None op = self.activeOperations[operationId] - message = f"Service {op['service']}" + message = f"{op['service']}" workflow = self.services.workflow if not workflow: logger.warning(f"Cannot log progress: no workflow available") return None + # Validate parentOperationId exists in activeOperations (for debugging) + if parentOperationId and parentOperationId not in self.activeOperations: + logger.debug(f"WARNING: Parent operation '{parentOperationId}' not found in activeOperations when creating log for '{operationId}'. Available operations: {list(self.activeOperations.keys())}. Child operation may appear at root level.") + # parentId in ChatLog should be the operationId of the parent operation, not the log entry ID logData = { "workflowId": workflow.id, diff --git a/modules/workflows/methods/methodAi/actions/process.py b/modules/workflows/methods/methodAi/actions/process.py index 5abc57cd..807c1a64 100644 --- a/modules/workflows/methods/methodAi/actions/process.py +++ b/modules/workflows/methods/methodAi/actions/process.py @@ -38,6 +38,11 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult: # Start progress tracking parentOperationId = parameters.get('parentOperationId') + if not parentOperationId: + logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}") + else: + logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'") + self.services.chat.progressLogStart( operationId, "Generate", diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py index a20f5ec1..7934ea19 100644 --- a/modules/workflows/methods/methodBase.py +++ b/modules/workflows/methods/methodBase.py @@ -188,9 +188,19 @@ class MethodBase: return wrapper def _validateParameters(self, parameters: Dict[str, Any], paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]: - """Validate parameters against definitions""" + """Validate parameters against definitions + + IMPORTANT: System parameters (like parentOperationId, expectedDocumentFormats) are preserved + even if they're not in the parameter definitions, as they're used internally by the framework. + """ validated = {} + # System parameters that should always be preserved, even if not in paramDefs + systemParams = ['parentOperationId', 'expectedDocumentFormats'] + for sysParam in systemParams: + if sysParam in parameters: + validated[sysParam] = parameters[sysParam] + for paramName, paramDef in paramDefs.items(): value = parameters.get(paramName)