From a958defd42a2f5917d46101117d2bde7ed1bd101 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Mon, 29 Dec 2025 23:54:27 +0100
Subject: [PATCH 1/5] fixed workflow run with chatlog
---
.../services/serviceAi/subStructureFilling.py | 102 ++++++++++++------
modules/shared/progressLogger.py | 4 +
.../methods/methodAi/actions/process.py | 5 +
modules/workflows/methods/methodBase.py | 12 ++-
4 files changed, 87 insertions(+), 36 deletions(-)
diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py
index 7089103c..6e3377a1 100644
--- a/modules/services/serviceAi/subStructureFilling.py
+++ b/modules/services/serviceAi/subStructureFilling.py
@@ -437,19 +437,26 @@ class StructureFiller:
if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
elements.append(jsonContent)
logger.debug("AI returned proper JSON image structure")
- continue
+ # Skip remaining image processing, but continue with progress updates
+ base64Data = None # Signal that image was already processed
elif isinstance(jsonContent, list) and len(jsonContent) > 0:
# Check if first element is an image
if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
elements.extend(jsonContent)
logger.debug("AI returned proper JSON image structure in list")
- continue
+ # Skip remaining image processing, but continue with progress updates
+ base64Data = None # Signal that image was already processed
+ else:
+ base64Data = "" # Continue with normal processing
except (json.JSONDecodeError, ValueError, AttributeError):
# Not JSON, treat as base64 string or data URI
- pass
+ base64Data = "" # Will be processed below
- # Already base64 string or data URI
- if aiResponse.content.startswith("data:image/"):
+ # Process base64 if not already handled above
+ if base64Data is None:
+ # Already processed as JSON, skip base64 processing
+ pass
+ elif aiResponse.content.startswith("data:image/"):
# Extract base64 from data URI
base64Data = aiResponse.content.split(",", 1)[1]
else:
@@ -463,8 +470,11 @@ class StructureFiller:
else:
base64Data = ""
- # Always create proper JSON structure for images
- if base64Data:
+ # Always create proper JSON structure for images (if not already processed)
+ if base64Data is None:
+ # Image already processed as JSON, skip
+ pass
+ elif base64Data:
elements.append({
"type": "image",
"content": {
@@ -527,20 +537,20 @@ class StructureFiller:
except Exception as e:
# Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!)
self.services.chat.progressLogFinish(sectionOperationId, False)
- elements.append({
- "type": "error",
- "message": f"Error generating section {sectionId}: {str(e)}",
- "sectionId": sectionId
- })
- logger.error(f"Error generating section {sectionId}: {str(e)}")
- # Still update chapter progress even on error
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
- )
- # NICHT raise - Section wird mit Fehlermeldung gerendert
+ elements.append({
+ "type": "error",
+ "message": f"Error generating section {sectionId}: {str(e)}",
+ "sectionId": sectionId
+ })
+ logger.error(f"Error generating section {sectionId}: {str(e)}")
+ # Still update chapter progress even on error
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
+ )
+ # NICHT raise - Section wird mit Fehlermeldung gerendert
else:
# Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts
@@ -634,17 +644,25 @@ class StructureFiller:
if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
elements.append(jsonContent)
logger.debug("AI returned proper JSON image structure")
- continue
+ # Skip remaining image processing, but continue with progress updates
+ base64Data = None # Signal that image was already processed
elif isinstance(jsonContent, list) and len(jsonContent) > 0:
if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
elements.extend(jsonContent)
logger.debug("AI returned proper JSON image structure in list")
- continue
+ # Skip remaining image processing, but continue with progress updates
+ base64Data = None # Signal that image was already processed
+ else:
+ base64Data = "" # Continue with normal processing
except (json.JSONDecodeError, ValueError, AttributeError):
- pass
+ base64Data = "" # Will be processed below
- # Already base64 string or data URI
- if aiResponse.content.startswith("data:image/"):
+ # Process base64 if not already handled above
+ if base64Data is None:
+ # Already processed as JSON, skip base64 processing
+ pass
+ elif aiResponse.content.startswith("data:image/"):
+ # Extract base64 from data URI
base64Data = aiResponse.content.split(",", 1)[1]
else:
content_stripped = aiResponse.content.strip()
@@ -655,8 +673,11 @@ class StructureFiller:
else:
base64Data = ""
- # Always create proper JSON structure for images
- if base64Data:
+ # Always create proper JSON structure for images (if not already processed)
+ if base64Data is None:
+ # Image already processed as JSON, skip
+ pass
+ elif base64Data:
elements.append({
"type": "image",
"content": {
@@ -853,17 +874,25 @@ class StructureFiller:
if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
elements.append(jsonContent)
logger.debug("AI returned proper JSON image structure")
- continue
+ # Skip remaining image processing, but continue with progress updates
+ base64Data = None # Signal that image was already processed
elif isinstance(jsonContent, list) and len(jsonContent) > 0:
if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
elements.extend(jsonContent)
logger.debug("AI returned proper JSON image structure in list")
- continue
+ # Skip remaining image processing, but continue with progress updates
+ base64Data = None # Signal that image was already processed
+ else:
+ base64Data = "" # Continue with normal processing
except (json.JSONDecodeError, ValueError, AttributeError):
- pass
+ base64Data = "" # Will be processed below
- # Already base64 string or data URI
- if aiResponse.content.startswith("data:image/"):
+ # Process base64 if not already handled above
+ if base64Data is None:
+ # Already processed as JSON, skip base64 processing
+ pass
+ elif aiResponse.content.startswith("data:image/"):
+ # Extract base64 from data URI
base64Data = aiResponse.content.split(",", 1)[1]
else:
content_stripped = aiResponse.content.strip()
@@ -874,8 +903,11 @@ class StructureFiller:
else:
base64Data = ""
- # Always create proper JSON structure for images
- if base64Data:
+ # Always create proper JSON structure for images (if not already processed)
+ if base64Data is None:
+ # Image already processed as JSON, skip
+ pass
+ elif base64Data:
elements.append({
"type": "image",
"content": {
diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py
index f7dcecac..8c6e56f8 100644
--- a/modules/shared/progressLogger.py
+++ b/modules/shared/progressLogger.py
@@ -139,6 +139,10 @@ class ProgressLogger:
logger.warning(f"Cannot log progress: no workflow available")
return None
+ # Validate parentOperationId exists in activeOperations (for debugging)
+ if parentOperationId and parentOperationId not in self.activeOperations:
+ logger.debug(f"WARNING: Parent operation '{parentOperationId}' not found in activeOperations when creating log for '{operationId}'. Available operations: {list(self.activeOperations.keys())}. Child operation may appear at root level.")
+
# parentId in ChatLog should be the operationId of the parent operation, not the log entry ID
logData = {
"workflowId": workflow.id,
diff --git a/modules/workflows/methods/methodAi/actions/process.py b/modules/workflows/methods/methodAi/actions/process.py
index 5abc57cd..807c1a64 100644
--- a/modules/workflows/methods/methodAi/actions/process.py
+++ b/modules/workflows/methods/methodAi/actions/process.py
@@ -38,6 +38,11 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult:
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
+ if not parentOperationId:
+ logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
+ else:
+ logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
+
self.services.chat.progressLogStart(
operationId,
"Generate",
diff --git a/modules/workflows/methods/methodBase.py b/modules/workflows/methods/methodBase.py
index a20f5ec1..7934ea19 100644
--- a/modules/workflows/methods/methodBase.py
+++ b/modules/workflows/methods/methodBase.py
@@ -188,9 +188,19 @@ class MethodBase:
return wrapper
def _validateParameters(self, parameters: Dict[str, Any], paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]:
- """Validate parameters against definitions"""
+ """Validate parameters against definitions
+
+ IMPORTANT: System parameters (like parentOperationId, expectedDocumentFormats) are preserved
+ even if they're not in the parameter definitions, as they're used internally by the framework.
+ """
validated = {}
+ # System parameters that should always be preserved, even if not in paramDefs
+ systemParams = ['parentOperationId', 'expectedDocumentFormats']
+ for sysParam in systemParams:
+ if sysParam in parameters:
+ validated[sysParam] = parameters[sysParam]
+
for paramName, paramDef in paramDefs.items():
value = parameters.get(paramName)
From fa57d3683b17e30a9fc649899344cb0f0aecc188 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 30 Dec 2025 02:06:51 +0100
Subject: [PATCH 2/5] parallel processing for rendering
---
.../serviceAi/PARALLEL_PROCESSING_CONCEPT.md | 376 ++++
.../services/serviceAi/REFACTORING_PLAN.md | 126 --
modules/services/serviceAi/mainServiceAi.py | 1 +
.../services/serviceAi/subAiCallLooping.py | 34 +-
.../services/serviceAi/subStructureFilling.py | 1926 ++++++++++-------
.../serviceAi/subStructureGeneration.py | 31 +-
6 files changed, 1542 insertions(+), 952 deletions(-)
create mode 100644 modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md
delete mode 100644 modules/services/serviceAi/REFACTORING_PLAN.md
diff --git a/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md b/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md
new file mode 100644
index 00000000..d8b55298
--- /dev/null
+++ b/modules/services/serviceAi/PARALLEL_PROCESSING_CONCEPT.md
@@ -0,0 +1,376 @@
+# Parallel Processing Refactoring Concept
+
+## Current State (Sequential)
+
+### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`)
+- **Current**: Processes chapters sequentially, one after another
+- **Flow**:
+ 1. Iterate through documents
+ 2. For each document, iterate through chapters
+ 3. For each chapter, generate sections structure using AI
+ 4. Update progress after each chapter
+
+### Section Content Generation (`_fillChapterSections`)
+- **Current**: Processes chapters sequentially, sections within each chapter sequentially
+- **Flow**:
+ 1. Iterate through documents
+ 2. For each document, iterate through chapters
+ 3. For each chapter, iterate through sections
+ 4. For each section, generate content using AI
+ 5. Update progress after each section
+
+## Desired State (Parallel)
+
+### Chapter Sections Structure Generation
+- **Target**: Process all chapters in parallel
+- **Requirements**:
+ - Maintain chapter order in final result
+ - Each chapter can be processed independently
+ - Progress updates should reflect parallel processing
+ - Errors in one chapter should not stop others
+
+### Section Content Generation
+- **Target**: Process sections within each chapter in parallel
+- **Requirements**:
+ - Maintain section order within each chapter
+ - Sections within a chapter can be processed independently
+ - Chapters still processed sequentially (to maintain order)
+ - Progress updates should reflect parallel processing
+ - Errors in one section should not stop others
+
+## Implementation Strategy
+
+### Phase 1: Chapter Sections Structure Generation Parallelization
+
+#### Step 1.1: Extract Single Chapter Processing
+- **Create**: `_generateSingleChapterSectionsStructure()` method
+- **Purpose**: Process one chapter independently
+- **Parameters**:
+ - `chapter`: Chapter dict
+ - `chapterIndex`: Index for ordering
+ - `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata
+ - `generationHint`: Generation instructions
+ - `contentPartIds`, `contentPartInstructions`: Content part info
+ - `contentParts`: Full content parts list
+ - `userPrompt`: User's original prompt
+ - `language`: Language for generation
+ - `parentOperationId`: For progress logging
+- **Returns**: None (modifies chapter dict in place)
+- **Error Handling**: Logs errors, raises exception to be caught by caller
+
+#### Step 1.2: Refactor Main Method
+- **Modify**: `_generateChapterSectionsStructure()`
+- **Changes**:
+ 1. Collect all chapters with their indices
+ 2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure`
+ 3. Use `asyncio.gather()` to execute all tasks in parallel
+ 4. Process results in order (using `zip` with original order)
+ 5. Handle errors per chapter (don't fail entire operation)
+ 6. Update progress after each chapter completes
+
+#### Step 1.3: Progress Reporting
+- **Maintain**: Overall progress tracking
+- **Update**: Progress after each chapter completes (not sequentially)
+- **Format**: "Chapter X/Y completed" or "Chapter X/Y error"
+
+### Phase 2: Section Content Generation Parallelization
+
+#### Step 2.1: Extract Single Section Processing
+- **Create**: `_processSingleSection()` method
+- **Purpose**: Process one section independently
+- **Parameters**:
+ - `section`: Section dict
+ - `sectionIndex`: Index for ordering
+ - `totalSections`: Total sections in chapter
+ - `chapterIndex`: Chapter index
+ - `totalChapters`: Total chapters
+ - `chapterId`: Chapter ID
+ - `chapterOperationId`: Chapter progress operation ID
+ - `fillOperationId`: Overall fill operation ID
+ - `contentParts`: Full content parts list
+ - `userPrompt`: User's original prompt
+ - `all_sections_list`: All sections for context
+ - `language`: Language for generation
+ - `calculateOverallProgress`: Function to calculate overall progress
+- **Returns**: `List[Dict[str, Any]]` (elements for the section)
+- **Error Handling**: Returns error element instead of raising
+
+#### Step 2.2: Extract Section Processing Logic
+- **Create**: Helper methods for different processing paths:
+ - `_processSectionAggregation()`: Handle aggregation path (multiple parts)
+ - `_processSectionGeneration()`: Handle generation without parts (only generationHint)
+ - `_processSectionParts()`: Handle individual part processing
+- **Purpose**: Keep logic organized and reusable
+
+#### Step 2.3: Refactor Main Method
+- **Modify**: `_fillChapterSections()`
+- **Changes**:
+ 1. Keep sequential chapter processing (maintains order)
+ 2. For each chapter, collect all sections with indices
+ 3. Create async tasks for each section using `_processSingleSection`
+ 4. Use `asyncio.gather()` to execute all section tasks in parallel
+ 5. Process results in order (using `zip` with original order)
+ 6. Assign elements to sections in correct order
+ 7. Update progress after each section completes
+ 8. Handle errors per section (don't fail entire chapter)
+
+#### Step 2.4: Progress Reporting
+- **Maintain**: Hierarchical progress tracking
+- **Update**:
+ - Section progress: After each section completes
+ - Chapter progress: After all sections in chapter complete
+ - Overall progress: After each section/chapter completes
+- **Format**: "Chapter X/Y, Section A/B completed"
+
+## Key Considerations
+
+### Order Preservation
+- **Chapters**: Must maintain document order → process chapters sequentially
+- **Sections**: Must maintain chapter order → process sections sequentially within chapter
+- **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order
+
+### Error Handling
+- **Chapters**: Error in one chapter should not stop others
+- **Sections**: Error in one section should not stop others
+- **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)`
+
+### Progress Reporting
+- **Challenge**: Progress updates happen out of order
+- **Solution**: Update progress when each task completes, not sequentially
+- **Format**: Show completed count, not sequential position
+
+### Shared State
+- **Chapters**: Modify chapter dicts in place (safe, each chapter is independent)
+- **Sections**: Return elements, assign to sections in order (safe, each section is independent)
+- **Content Parts**: Read-only, passed to all tasks (safe)
+
+### Dependencies
+- **Chapters**: No dependencies between chapters
+- **Sections**: No dependencies between sections (each is self-contained)
+- **Solution**: All tasks can run truly in parallel
+
+## Implementation Steps
+
+### Step 1: Clean Current Code
+1. Ensure current sequential implementation is correct
+2. Fix any existing bugs
+3. Verify all tests pass
+
+### Step 2: Implement Chapter Parallelization
+1. Create `_generateSingleChapterSectionsStructure()` method
+2. Extract chapter processing logic
+3. Refactor `_generateChapterSectionsStructure()` to use parallel processing
+4. Test with single chapter
+5. Test with multiple chapters
+6. Verify order preservation
+7. Verify error handling
+
+### Step 3: Implement Section Parallelization
+1. Create `_processSingleSection()` method
+2. Extract section processing logic into helper methods
+3. Refactor `_fillChapterSections()` to use parallel processing for sections
+4. Test with single section
+5. Test with multiple sections
+6. Test with multiple chapters
+7. Verify order preservation
+8. Verify error handling
+
+### Step 4: Testing & Validation
+1. Test with various document structures
+2. Test error scenarios
+3. Verify progress reporting accuracy
+4. Performance testing (compare sequential vs parallel)
+5. Verify final output order matches input order
+
+## Code Structure
+
+### New Methods to Create
+
+```python
+async def _generateSingleChapterSectionsStructure(
+ self,
+ chapter: Dict[str, Any],
+ chapterIndex: int,
+ chapterId: str,
+ chapterLevel: int,
+ chapterTitle: str,
+ generationHint: str,
+ contentPartIds: List[str],
+ contentPartInstructions: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ language: str,
+ parentOperationId: str
+) -> None:
+ """Generate sections structure for a single chapter (used for parallel processing)."""
+ # Extract logic from current sequential loop
+ # Modify chapter dict in place
+ # Handle errors internally, raise if critical
+
+async def _processSingleSection(
+ self,
+ section: Dict[str, Any],
+ sectionIndex: int,
+ totalSections: int,
+ chapterIndex: int,
+ totalChapters: int,
+ chapterId: str,
+ chapterOperationId: str,
+ fillOperationId: str,
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ all_sections_list: List[Dict[str, Any]],
+ language: str,
+ calculateOverallProgress: Callable
+) -> List[Dict[str, Any]]:
+ """Process a single section and return its elements."""
+ # Extract logic from current sequential loop
+ # Return elements list
+ # Return error element on failure (don't raise)
+
+async def _processSectionAggregation(
+ self,
+ section: Dict[str, Any],
+ sectionId: str,
+ sectionTitle: str,
+ sectionIndex: int,
+ totalSections: int,
+ chapterId: str,
+ chapterOperationId: str,
+ fillOperationId: str,
+ contentPartIds: List[str],
+ contentFormats: Dict[str, str],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ generationHint: str,
+ all_sections_list: List[Dict[str, Any]],
+ language: str
+) -> List[Dict[str, Any]]:
+ """Process section with aggregation (multiple parts together)."""
+ # Extract aggregation logic
+ # Return elements list
+
+async def _processSectionGeneration(
+ self,
+ section: Dict[str, Any],
+ sectionId: str,
+ sectionTitle: str,
+ sectionIndex: int,
+ totalSections: int,
+ chapterId: str,
+ chapterOperationId: str,
+ fillOperationId: str,
+ contentType: str,
+ userPrompt: str,
+ generationHint: str,
+ all_sections_list: List[Dict[str, Any]],
+ language: str
+) -> List[Dict[str, Any]]:
+ """Process section generation without content parts (only generationHint)."""
+ # Extract generation logic
+ # Return elements list
+
+async def _processSectionParts(
+ self,
+ section: Dict[str, Any],
+ sectionId: str,
+ sectionTitle: str,
+ sectionIndex: int,
+ totalSections: int,
+ chapterId: str,
+ chapterOperationId: str,
+ fillOperationId: str,
+ contentPartIds: List[str],
+ contentFormats: Dict[str, str],
+ contentParts: List[ContentPart],
+ contentType: str,
+ useAiCall: bool,
+ generationHint: str,
+ userPrompt: str,
+ all_sections_list: List[Dict[str, Any]],
+ language: str
+) -> List[Dict[str, Any]]:
+ """Process individual parts in a section."""
+ # Extract individual part processing logic
+ # Return elements list
+```
+
+### Modified Methods
+
+```python
+async def _generateChapterSectionsStructure(
+ self,
+ chapterStructure: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ parentOperationId: str
+) -> Dict[str, Any]:
+ """Generate sections structure for all chapters in parallel."""
+ # Collect chapters with indices
+ # Create tasks
+ # Execute in parallel
+ # Process results in order
+ # Update progress
+
+async def _fillChapterSections(
+ self,
+ chapterStructure: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ fillOperationId: str
+) -> Dict[str, Any]:
+ """Fill sections with content, processing sections in parallel within each chapter."""
+ # Process chapters sequentially
+ # For each chapter, process sections in parallel
+ # Maintain order
+ # Update progress
+```
+
+## Testing Strategy
+
+### Unit Tests
+1. Test `_generateSingleChapterSectionsStructure` independently
+2. Test `_processSingleSection` independently
+3. Test helper methods independently
+
+### Integration Tests
+1. Test parallel chapter processing with multiple chapters
+2. Test parallel section processing with multiple sections
+3. Test error handling (one chapter/section fails)
+4. Test order preservation
+
+### Performance Tests
+1. Measure sequential vs parallel execution time
+2. Verify parallel processing is faster
+3. Check resource usage (memory, CPU)
+
+## Risk Mitigation
+
+### Risks
+1. **Order not preserved**: Use `zip` with original order
+2. **Race conditions**: No shared mutable state between tasks
+3. **Progress reporting incorrect**: Update progress when tasks complete
+4. **Errors not handled**: Use `return_exceptions=True` and check results
+5. **Performance degradation**: Test and measure, fallback to sequential if needed
+
+### Safety Measures
+1. Keep sequential implementation as fallback (commented out)
+2. Add feature flag to enable/disable parallel processing
+3. Extensive logging for debugging
+4. Gradual rollout (test with small datasets first)
+
+## Migration Path
+
+1. **Phase 1**: Implement chapter parallelization, test thoroughly
+2. **Phase 2**: Implement section parallelization, test thoroughly
+3. **Phase 3**: Enable both in production with monitoring
+4. **Phase 4**: Remove sequential fallback code (if stable)
+
+## Notes
+
+- All async methods must use `await` correctly
+- Progress updates happen asynchronously (may appear out of order in logs)
+- Final result order is guaranteed by processing results in order
+- Error handling is per-task, not global
+- No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts)
+
diff --git a/modules/services/serviceAi/REFACTORING_PLAN.md b/modules/services/serviceAi/REFACTORING_PLAN.md
deleted file mode 100644
index 2ce7a717..00000000
--- a/modules/services/serviceAi/REFACTORING_PLAN.md
+++ /dev/null
@@ -1,126 +0,0 @@
-# Refactoring Plan für mainServiceAi.py
-
-## Ziel
-Aufteilen des 3000-Zeilen-Moduls in überschaubare Submodule (~300-600 Zeilen pro Modul).
-
-## Vorgeschlagene Struktur
-
-### Bereits erstellt:
-1. ✅ `subResponseParsing.py` - ResponseParser Klasse
-2. ✅ `subDocumentIntents.py` - DocumentIntentAnalyzer Klasse
-
-### Noch zu erstellen:
-3. `subContentExtraction.py` - ContentExtractor Klasse
- - `extractAndPrepareContent()` (~490 Zeilen)
- - `extractTextFromImage()` (~55 Zeilen)
- - `processTextContentWithAi()` (~72 Zeilen)
- - `_isBinary()` (~10 Zeilen)
-
-4. `subStructureGeneration.py` - StructureGenerator Klasse
- - `generateStructure()` (~60 Zeilen)
- - `_buildStructurePrompt()` (~130 Zeilen)
-
-5. `subStructureFilling.py` - StructureFiller Klasse
- - `fillStructure()` (~290 Zeilen)
- - `_buildSectionGenerationPrompt()` (~185 Zeilen)
- - `_findContentPartById()` (~5 Zeilen)
- - `_needsAggregation()` (~20 Zeilen)
-
-6. `subAiCallLooping.py` - AiCallLooper Klasse
- - `callAiWithLooping()` (~405 Zeilen)
- - `_defineKpisFromPrompt()` (~92 Zeilen)
-
-## Refactoring-Schritte für mainServiceAi.py
-
-### Schritt 1: Submodule-Initialisierung erweitern
-
-```python
-def _initializeSubmodules(self):
- """Initialize all submodules after aiObjects is ready."""
- if self.aiObjects is None:
- raise RuntimeError("aiObjects must be initialized before initializing submodules")
-
- if self.extractionService is None:
- logger.info("Initializing ExtractionService...")
- self.extractionService = ExtractionService(self.services)
-
- # Neue Submodule initialisieren
- from modules.services.serviceAi.subResponseParsing import ResponseParser
- from modules.services.serviceAi.subDocumentIntents import DocumentIntentAnalyzer
- from modules.services.serviceAi.subContentExtraction import ContentExtractor
- from modules.services.serviceAi.subStructureGeneration import StructureGenerator
- from modules.services.serviceAi.subStructureFilling import StructureFiller
-
- if not hasattr(self, 'responseParser'):
- self.responseParser = ResponseParser(self.services)
-
- if not hasattr(self, 'intentAnalyzer'):
- self.intentAnalyzer = DocumentIntentAnalyzer(self.services, self)
-
- if not hasattr(self, 'contentExtractor'):
- self.contentExtractor = ContentExtractor(self.services, self)
-
- if not hasattr(self, 'structureGenerator'):
- self.structureGenerator = StructureGenerator(self.services, self)
-
- if not hasattr(self, 'structureFiller'):
- self.structureFiller = StructureFiller(self.services, self)
-```
-
-### Schritt 2: Methoden durch Delegation ersetzen
-
-**Beispiel für Response Parsing:**
-```python
-# ALT:
-def _extractSectionsFromResponse(self, ...):
- # 100 Zeilen Code
- ...
-
-# NEU:
-def _extractSectionsFromResponse(self, ...):
- return self.responseParser.extractSectionsFromResponse(...)
-```
-
-**Beispiel für Document Intents:**
-```python
-# ALT:
-async def _clarifyDocumentIntents(self, ...):
- # 100 Zeilen Code
- ...
-
-# NEU:
-async def _clarifyDocumentIntents(self, ...):
- return await self.intentAnalyzer.clarifyDocumentIntents(...)
-```
-
-### Schritt 3: Helper-Methoden beibehalten
-
-Kleine Helper-Methoden bleiben im Hauptmodul:
-- `_buildPromptWithPlaceholders()`
-- `_getIntentForDocument()`
-- `_shouldSkipContentPart()`
-- `_determineDocumentName()`
-
-### Schritt 4: Public API unverändert lassen
-
-Die öffentliche API (`callAiPlanning`, `callAiContent`) bleibt unverändert.
-
-## Erwartete Ergebnis-Größen
-
-- `mainServiceAi.py`: ~800-1000 Zeilen (von 3016)
-- `subResponseParsing.py`: ~200 Zeilen ✅
-- `subDocumentIntents.py`: ~300 Zeilen ✅
-- `subContentExtraction.py`: ~600 Zeilen
-- `subStructureGeneration.py`: ~200 Zeilen
-- `subStructureFilling.py`: ~400 Zeilen
-- `subAiCallLooping.py`: ~500 Zeilen
-
-**Gesamt: ~3000 Zeilen** (gleich, aber besser organisiert)
-
-## Vorteile
-
-1. **Übersichtlichkeit**: Jedes Modul hat eine klare Verantwortlichkeit
-2. **Wartbarkeit**: Änderungen sind lokalisiert
-3. **Testbarkeit**: Module können einzeln getestet werden
-4. **Wiederverwendbarkeit**: Module können in anderen Kontexten verwendet werden
-
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index 9839093d..65bae155 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -676,6 +676,7 @@ Respond with ONLY a JSON object in this exact format:
)
# Schritt 5D: Fülle Struktur
+ # Language will be extracted from services (user intention analysis) in fillStructure
filledStructure = await self._fillStructure(
structure,
contentParts or [],
diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py
index 8ebafd23..bb1824c2 100644
--- a/modules/services/serviceAi/subAiCallLooping.py
+++ b/modules/services/serviceAi/subAiCallLooping.py
@@ -14,7 +14,7 @@ from typing import Dict, Any, List, Optional, Callable
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState
from modules.datamodels.datamodelExtraction import ContentPart
-from modules.shared.jsonUtils import buildContinuationContext, extractJsonString
+from modules.shared.jsonUtils import buildContinuationContext, extractJsonString, tryParseJson
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
logger = logging.getLogger(__name__)
@@ -192,6 +192,38 @@ class AiCallLooper:
# Store raw response for continuation (even if broken)
lastRawResponse = result
+ # Check if this is section content generation (has "elements" not "sections")
+ # Section content generation returns JSON with "elements" array, not document structure with "sections"
+ isSectionContentGeneration = False
+ parsedJsonForSection = None
+ extractedJsonForSection = None
+ try:
+ extractedJsonForSection = extractJsonString(result)
+ parsedJson, parseError, _ = tryParseJson(extractedJsonForSection)
+ if parseError is None and parsedJson:
+ parsedJsonForSection = parsedJson
+ # Check if JSON has "elements" (section content) or "sections" (document structure)
+ if isinstance(parsedJson, dict):
+ if "elements" in parsedJson:
+ isSectionContentGeneration = True
+ elif isinstance(parsedJson, list) and len(parsedJson) > 0:
+ # Check if it's a list of elements (section content format)
+ if isinstance(parsedJson[0], dict) and "type" in parsedJson[0]:
+ isSectionContentGeneration = True
+ except Exception:
+ pass
+
+ if isSectionContentGeneration:
+ # This is section content generation - return the JSON directly
+ # No need to extract sections, just return the complete JSON string
+ logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly")
+ if iterationOperationId:
+ self.services.chat.progressLogFinish(iterationOperationId, True)
+ # Write final result
+ final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result)
+ self.services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
+ return final_json
+
# Extract sections from response (handles both valid and broken JSON)
# Only for document generation (JSON responses)
# CRITICAL: Pass allSections and accumulationState to enable string accumulation
diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py
index 6e3377a1..75642b48 100644
--- a/modules/services/serviceAi/subStructureFilling.py
+++ b/modules/services/serviceAi/subStructureFilling.py
@@ -11,6 +11,7 @@ Handles filling document structure with content, including:
import json
import logging
import copy
+import asyncio
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelExtraction import ContentPart
@@ -27,12 +28,27 @@ class StructureFiller:
self.services = services
self.aiService = aiService
+ def _getUserLanguage(self) -> str:
+ """Get user language for document generation"""
+ try:
+ if self.services:
+ # Prefer detected language if available (from user intention analysis)
+ if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage:
+ return self.services.currentUserLanguage
+ # Fallback to user's preferred language
+ elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'):
+ return self.services.user.language
+ except Exception:
+ pass
+ return 'en' # Default fallback
+
async def fillStructure(
self,
structure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
- parentOperationId: str
+ parentOperationId: str,
+ language: Optional[str] = None
) -> Dict[str, Any]:
"""
Phase 5D: Chapter-Content-Generierung (Zwei-Phasen-Ansatz).
@@ -45,6 +61,7 @@ class StructureFiller:
contentParts: Alle vorbereiteten ContentParts
userPrompt: User-Anfrage
parentOperationId: Parent Operation-ID für ChatLog-Hierarchie
+ language: Language identified from user intention analysis (e.g., "de", "en", "fr")
Returns:
Gefüllte Struktur mit elements in jeder Section (nach Flattening)
@@ -64,6 +81,13 @@ class StructureFiller:
logger.error(error_msg)
raise ValueError(error_msg)
+ # Get language from services (user intention analysis) or parameter
+ if language is None:
+ language = self._getUserLanguage()
+ logger.debug(f"Using language from services (user intention analysis): {language}")
+ else:
+ logger.debug(f"Using provided language parameter: {language}")
+
# Starte ChatLog mit Parent-Referenz
chapterCount = sum(len(doc.get("chapters", [])) for doc in structure.get("documents", []))
self.services.chat.progressLogStart(
@@ -79,12 +103,12 @@ class StructureFiller:
# Phase 5D.1: Sections-Struktur für jedes Chapter generieren
filledStructure = await self._generateChapterSectionsStructure(
- filledStructure, contentParts, userPrompt, fillOperationId
+ filledStructure, contentParts, userPrompt, fillOperationId, language
)
# Phase 5D.2: Sections mit ContentParts füllen
filledStructure = await self._fillChapterSections(
- filledStructure, contentParts, userPrompt, fillOperationId
+ filledStructure, contentParts, userPrompt, fillOperationId, language
)
# Flattening: Chapters zu Sections konvertieren
@@ -103,19 +127,133 @@ class StructureFiller:
logger.error(f"Error in fillStructure: {str(e)}")
raise
+ async def _generateSingleChapterSectionsStructure(
+ self,
+ chapter: Dict[str, Any],
+ chapterIndex: int,
+ chapterId: str,
+ chapterLevel: int,
+ chapterTitle: str,
+ generationHint: str,
+ contentPartIds: List[str],
+ contentPartInstructions: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ language: str,
+ parentOperationId: str,
+ totalChapters: int
+ ) -> None:
+ """
+ Generate sections structure for a single chapter (used for parallel processing).
+ Modifies chapter dict in place.
+ """
+ try:
+ # Update progress for chapter structure generation
+ progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ parentOperationId,
+ progress,
+ f"Generating sections for Chapter {chapterIndex}/{totalChapters}: {chapterTitle}"
+ )
+
+ chapterPrompt = self._buildChapterSectionsStructurePrompt(
+ chapterId=chapterId,
+ chapterLevel=chapterLevel,
+ chapterTitle=chapterTitle,
+ generationHint=generationHint,
+ contentPartIds=contentPartIds,
+ contentPartInstructions=contentPartInstructions,
+ contentParts=contentParts,
+ userPrompt=userPrompt,
+ language=language
+ )
+
+ # AI-Call für Chapter-Struktur-Generierung
+ # Note: Debug logging is handled by callAiPlanning
+ aiResponse = await self.aiService.callAiPlanning(
+ prompt=chapterPrompt,
+ debugType=f"chapter_structure_{chapterId}"
+ )
+
+ sectionsStructure = json.loads(
+ self.services.utils.jsonExtractString(aiResponse)
+ )
+
+ chapter["sections"] = sectionsStructure.get("sections", [])
+
+ # Setze useAiCall Flag (falls nicht von AI gesetzt)
+ # WICHTIG: useAiCall kann nur true sein, wenn mindestens ein ContentPart Format "extracted" hat!
+ # "object" und "reference" Formate werden direkt als Elemente hinzugefügt, benötigen kein AI.
+ for section in chapter["sections"]:
+ if "useAiCall" not in section:
+ contentType = section.get("content_type", "paragraph")
+ sectionContentPartIds = section.get("contentPartIds", [])
+
+ # Prüfe ob mindestens ein ContentPart Format "extracted" hat
+ hasExtractedPart = False
+ for partId in sectionContentPartIds:
+ part = self._findContentPartById(partId, contentParts)
+ if part:
+ contentFormat = part.metadata.get("contentFormat", "unknown")
+ if contentFormat == "extracted":
+ hasExtractedPart = True
+ break
+
+ # useAiCall kann nur true sein, wenn extracted Parts vorhanden sind
+ useAiCall = False
+ if hasExtractedPart:
+ # Prüfe ob Transformation nötig ist
+ useAiCall = contentType != "paragraph"
+
+ # Prüfe contentPartInstructions für Transformation
+ if not useAiCall:
+ for partId in sectionContentPartIds:
+ instruction = contentPartInstructions.get(partId, {}).get("instruction", "")
+ if instruction and instruction.lower() not in ["include full text", "include all content", "use full extracted text"]:
+ useAiCall = True
+ break
+
+ section["useAiCall"] = useAiCall
+ logger.debug(f"Section {section.get('id')}: useAiCall={useAiCall} (hasExtractedPart={hasExtractedPart}, contentType={contentType})")
+
+ # Update progress after chapter completion
+ progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ parentOperationId,
+ progress,
+ f"Chapter {chapterIndex}/{totalChapters} completed: {chapterTitle}"
+ )
+
+ except Exception as e:
+ logger.error(f"Error generating sections structure for chapter {chapterId}: {str(e)}")
+ # Set empty sections on error
+ chapter["sections"] = []
+ # Update progress even on error
+ progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ parentOperationId,
+ progress,
+ f"Chapter {chapterIndex}/{totalChapters} error: {chapterTitle}"
+ )
+ raise
+
async def _generateChapterSectionsStructure(
self,
chapterStructure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
- parentOperationId: str
+ parentOperationId: str,
+ language: str
) -> Dict[str, Any]:
"""
- Phase 5D.1: Generiert Sections-Struktur für jedes Chapter (ohne Content).
+ Phase 5D.1: Generiert Sections-Struktur für jedes Chapter (ohne Content) in parallel.
Sections enthalten: content_type, contentPartIds, generationHint, useAiCall
"""
# Count total chapters for progress tracking
totalChapters = sum(len(doc.get("chapters", [])) for doc in chapterStructure.get("documents", []))
+
+ # Collect all chapters with their indices for parallel processing
+ chapterTasks = []
chapterIndex = 0
for doc in chapterStructure.get("documents", []):
@@ -128,15 +266,10 @@ class StructureFiller:
contentPartIds = chapter.get("contentPartIds", [])
contentPartInstructions = chapter.get("contentPartInstructions", {})
- # Update progress for chapter structure generation
- progress = chapterIndex / totalChapters if totalChapters > 0 else 1.0
- self.services.chat.progressLogUpdate(
- parentOperationId,
- progress,
- f"Generating sections for Chapter {chapterIndex}/{totalChapters}: {chapterTitle}"
- )
-
- chapterPrompt = self._buildChapterSectionsStructurePrompt(
+ # Create task for parallel processing
+ task = self._generateSingleChapterSectionsStructure(
+ chapter=chapter,
+ chapterIndex=chapterIndex,
chapterId=chapterId,
chapterLevel=chapterLevel,
chapterTitle=chapterTitle,
@@ -144,69 +277,926 @@ class StructureFiller:
contentPartIds=contentPartIds,
contentPartInstructions=contentPartInstructions,
contentParts=contentParts,
- userPrompt=userPrompt
+ userPrompt=userPrompt,
+ language=language,
+ parentOperationId=parentOperationId,
+ totalChapters=totalChapters
)
-
- # AI-Call für Chapter-Struktur-Generierung
- # Note: Debug logging is handled by callAiPlanning
- aiResponse = await self.aiService.callAiPlanning(
- prompt=chapterPrompt,
- debugType=f"chapter_structure_{chapterId}"
- )
-
- sectionsStructure = json.loads(
- self.services.utils.jsonExtractString(aiResponse)
- )
-
- chapter["sections"] = sectionsStructure.get("sections", [])
-
- # Setze useAiCall Flag (falls nicht von AI gesetzt)
- # WICHTIG: useAiCall kann nur true sein, wenn mindestens ein ContentPart Format "extracted" hat!
- # "object" und "reference" Formate werden direkt als Elemente hinzugefügt, benötigen kein AI.
- for section in chapter["sections"]:
- if "useAiCall" not in section:
- contentType = section.get("content_type", "paragraph")
- contentPartIds = section.get("contentPartIds", [])
-
- # Prüfe ob mindestens ein ContentPart Format "extracted" hat
- hasExtractedPart = False
- for partId in contentPartIds:
- part = self._findContentPartById(partId, contentParts)
- if part:
- contentFormat = part.metadata.get("contentFormat", "unknown")
- if contentFormat == "extracted":
- hasExtractedPart = True
- break
-
- # useAiCall kann nur true sein, wenn extracted Parts vorhanden sind
- useAiCall = False
- if hasExtractedPart:
- # Prüfe ob Transformation nötig ist
- useAiCall = contentType != "paragraph"
-
- # Prüfe contentPartInstructions für Transformation
- if not useAiCall:
- for partId in contentPartIds:
- instruction = contentPartInstructions.get(partId, {}).get("instruction", "")
- if instruction and instruction.lower() not in ["include full text", "include all content", "use full extracted text"]:
- useAiCall = True
- break
-
- section["useAiCall"] = useAiCall
- logger.debug(f"Section {section.get('id')}: useAiCall={useAiCall} (hasExtractedPart={hasExtractedPart}, contentType={contentType})")
+ chapterTasks.append((chapterIndex, chapter, task))
+
+ # Execute all chapter tasks in parallel
+ if chapterTasks:
+ # Create list of tasks (without indices for gather)
+ tasks = [task for _, _, task in chapterTasks]
+
+ # Execute in parallel with error handling
+ results = await asyncio.gather(*tasks, return_exceptions=True)
+
+ # Process results in order and handle errors
+ for (originalIndex, originalChapter, _), result in zip(chapterTasks, results):
+ if isinstance(result, Exception):
+ logger.error(f"Error processing chapter {originalChapter.get('id')}: {str(result)}")
+ # Chapter already has empty sections set by _generateSingleChapterSectionsStructure
+ # Continue processing other chapters
return chapterStructure
+ async def _processAiResponseForSection(
+ self,
+ aiResponse: Any,
+ contentType: str,
+ operationType: OperationTypeEnum,
+ sectionId: str,
+ generationHint: str,
+ generatedElements: List[Dict[str, Any]]
+ ) -> List[Dict[str, Any]]:
+ """
+ Helper method to process AI response and extract elements.
+ Handles both IMAGE_GENERATE and DATA_ANALYSE operation types.
+ """
+ elements = []
+
+ # Handle IMAGE_GENERATE differently - returns image data directly
+ if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE:
+ import base64
+ base64Data = ""
+
+ # Convert image data to base64 string if needed
+ if isinstance(aiResponse.content, bytes):
+ base64Data = base64.b64encode(aiResponse.content).decode('utf-8')
+ elif isinstance(aiResponse.content, str):
+ # Check if it's already a JSON structure
+ try:
+ jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content))
+ if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
+ elements.append(jsonContent)
+ logger.debug("AI returned proper JSON image structure")
+ base64Data = None # Signal that image was already processed
+ elif isinstance(jsonContent, list) and len(jsonContent) > 0:
+ if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
+ elements.extend(jsonContent)
+ logger.debug("AI returned proper JSON image structure in list")
+ base64Data = None # Signal that image was already processed
+ else:
+ base64Data = "" # Continue with normal processing
+ else:
+ base64Data = "" # Continue with normal processing
+ except (json.JSONDecodeError, ValueError, AttributeError):
+ base64Data = "" # Will be processed below
+
+ # Process base64 if not already handled above
+ if base64Data is None:
+ # Already processed as JSON, skip base64 processing
+ pass
+ elif aiResponse.content.startswith("data:image/"):
+ # Extract base64 from data URI
+ base64Data = aiResponse.content.split(",", 1)[1]
+ else:
+ content_stripped = aiResponse.content.strip()
+ if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]):
+ base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "")
+ else:
+ base64Data = aiResponse.content
+ else:
+ base64Data = ""
+
+ # Always create proper JSON structure for images (if not already processed)
+ if base64Data is None:
+ # Image already processed as JSON, skip
+ pass
+ elif base64Data:
+ elements.append({
+ "type": "image",
+ "content": {
+ "base64Data": base64Data,
+ "altText": generationHint or "Generated image",
+ "caption": ""
+ }
+ })
+ logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}")
+ else:
+ logger.warning(f"IMAGE_GENERATE returned empty or invalid content for section {sectionId}")
+ elements.append({
+ "type": "error",
+ "message": f"Image generation returned empty or invalid content",
+ "sectionId": sectionId
+ })
+ else:
+ # For non-image content: Use already parsed elements from _callAiWithLooping
+ if generatedElements:
+ elements.extend(generatedElements)
+ else:
+ # Fallback: Try to parse JSON response directly
+ try:
+ fallbackElements = json.loads(
+ self.services.utils.jsonExtractString(aiResponse.content)
+ )
+ if isinstance(fallbackElements, list):
+ elements.extend(fallbackElements)
+ elif isinstance(fallbackElements, dict) and "elements" in fallbackElements:
+ elements.extend(fallbackElements["elements"])
+ elif isinstance(fallbackElements, dict) and fallbackElements.get("type"):
+ elements.append(fallbackElements)
+ except (json.JSONDecodeError, ValueError) as json_error:
+ logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}")
+ elements.append({
+ "type": "error",
+ "message": f"Failed to parse JSON response: {str(json_error)}",
+ "sectionId": sectionId
+ })
+
+ return elements
+
+ async def _processSingleSection(
+ self,
+ section: Dict[str, Any],
+ sectionIndex: int,
+ totalSections: int,
+ chapterIndex: int,
+ totalChapters: int,
+ chapterId: str,
+ chapterOperationId: str,
+ fillOperationId: str,
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ all_sections_list: List[Dict[str, Any]],
+ language: str,
+ calculateOverallProgress: callable
+ ) -> List[Dict[str, Any]]:
+ """
+ Process a single section and return its elements.
+ Used for parallel processing of sections within a chapter.
+ """
+ sectionId = section.get("id")
+ sectionTitle = section.get("title", sectionId)
+ contentPartIds = section.get("contentPartIds", [])
+ contentFormats = section.get("contentFormats", {})
+ generationHint = section.get("generationHint") or section.get("generation_hint")
+ contentType = section.get("content_type", "paragraph")
+ useAiCall = section.get("useAiCall", False)
+
+ # Update overall progress at start of section
+ overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex, totalSections)
+ self.services.chat.progressLogUpdate(
+ fillOperationId,
+ overallProgress,
+ f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections}: {sectionTitle}"
+ )
+
+ # WICHTIG: Wenn keine ContentParts vorhanden sind UND kein generationHint, kann kein AI-Call gemacht werden
+ if len(contentPartIds) == 0 and not generationHint:
+ useAiCall = False
+ logger.debug(f"Section {sectionId}: No content parts and no generation hint, setting useAiCall=False")
+ elif len(contentPartIds) == 0 and generationHint and not useAiCall:
+ useAiCall = True
+ logger.info(f"Section {sectionId}: Overriding useAiCall=True (has generationHint but no content parts)")
+
+ elements = []
+
+ # Prüfe ob Aggregation nötig ist
+ needsAggregation = self._needsAggregation(
+ contentType=contentType,
+ contentPartCount=len(contentPartIds)
+ )
+
+ logger.info(f"Processing section {sectionId}: contentType={contentType}, contentPartCount={len(contentPartIds)}, useAiCall={useAiCall}, needsAggregation={needsAggregation}, hasGenerationHint={bool(generationHint)}")
+
+ try:
+ if needsAggregation and useAiCall:
+ # Aggregation: Alle Parts zusammen verarbeiten
+ sectionParts = [
+ self._findContentPartById(pid, contentParts)
+ for pid in contentPartIds
+ ]
+ sectionParts = [p for p in sectionParts if p is not None]
+
+ if sectionParts:
+ # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt)
+ extractedParts = [
+ p for p in sectionParts
+ if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted"
+ ]
+ nonExtractedParts = [
+ p for p in sectionParts
+ if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted"
+ ]
+
+ # Verarbeite non-extracted Parts separat (reference, object)
+ for part in nonExtractedParts:
+ contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat"))
+
+ if contentFormat == "reference":
+ elements.append({
+ "type": "reference",
+ "documentReference": part.metadata.get("documentReference"),
+ "label": part.metadata.get("usageHint", part.label)
+ })
+ elif contentFormat == "object":
+ if part.typeGroup == "image":
+ elements.append({
+ "type": "image",
+ "content": {
+ "base64Data": part.data,
+ "altText": part.metadata.get("usageHint", part.label),
+ "caption": part.metadata.get("caption", "")
+ }
+ })
+ else:
+ elements.append({
+ "type": part.typeGroup,
+ "content": {
+ "data": part.data,
+ "mimeType": part.mimeType,
+ "label": part.metadata.get("usageHint", part.label)
+ }
+ })
+
+ # Aggregiere extracted Parts mit AI
+ if extractedParts:
+ logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI")
+ isAggregation = True
+ generationPrompt = self._buildSectionGenerationPrompt(
+ section=section,
+ contentParts=extractedParts,
+ userPrompt=userPrompt,
+ generationHint=generationHint,
+ allSections=all_sections_list,
+ sectionIndex=sectionIndex,
+ isAggregation=isAggregation,
+ language=language
+ )
+
+ sectionOperationId = f"{fillOperationId}_section_{sectionId}"
+ self.services.chat.progressLogStart(
+ sectionOperationId,
+ "Section Generation (Aggregation)",
+ f"Section {sectionIndex + 1}/{totalSections}",
+ f"{sectionTitle} ({len(extractedParts)} parts)",
+ parentOperationId=chapterOperationId
+ )
+
+ try:
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+ logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)")
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
+
+ operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE
+
+ if operationType == OperationTypeEnum.IMAGE_GENERATE:
+ maxPromptLength = 4000
+ if len(generationPrompt) > maxPromptLength:
+ logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
+ generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+
+ request = AiCallRequest(
+ prompt=generationPrompt,
+ contentParts=[],
+ options=AiCallOptions(
+ operationType=operationType,
+ priority=PriorityEnum.BALANCED,
+ processingMode=ProcessingModeEnum.DETAILED
+ )
+ )
+ aiResponse = await self.aiService.callAi(request)
+ generatedElements = []
+ else:
+ async def buildSectionPromptWithContinuation(
+ section: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ generationHint: str,
+ allSections: List[Dict[str, Any]],
+ sectionIndex: int,
+ isAggregation: bool,
+ continuationContext: Dict[str, Any],
+ services: Any
+ ) -> str:
+ basePrompt = self._buildSectionGenerationPrompt(
+ section=section,
+ contentParts=contentParts,
+ userPrompt=userPrompt,
+ generationHint=generationHint,
+ allSections=allSections,
+ sectionIndex=sectionIndex,
+ isAggregation=isAggregation,
+ language=language
+ )
+
+ continuationInfo = continuationContext.get("delivered_summary", "")
+ cutOffElement = continuationContext.get("cut_off_element", "")
+
+ continuationPrompt = f"""{basePrompt}
+
+--- CONTINUATION REQUEST ---
+The previous JSON response was incomplete. Please continue from where it stopped.
+
+PREVIOUSLY DELIVERED SUMMARY:
+{continuationInfo}
+
+LAST INCOMPLETE ELEMENT:
+{cutOffElement}
+
+TASK: Continue generating the JSON elements array from where it was cut off.
+Complete the incomplete element and continue with remaining elements.
+
+Return ONLY the continuation JSON (starting from the incomplete element).
+The JSON should be a fragment that can be merged with the previous response."""
+ return continuationPrompt
+
+ options = AiCallOptions(
+ operationType=operationType,
+ priority=PriorityEnum.BALANCED,
+ processingMode=ProcessingModeEnum.DETAILED
+ )
+
+ aiResponseJson = await self.aiService._callAiWithLooping(
+ prompt=generationPrompt,
+ options=options,
+ debugPrefix=f"{chapterId}_section_{sectionId}",
+ promptBuilder=buildSectionPromptWithContinuation,
+ promptArgs={
+ "section": section,
+ "contentParts": extractedParts,
+ "userPrompt": userPrompt,
+ "generationHint": generationHint,
+ "allSections": all_sections_list,
+ "sectionIndex": sectionIndex,
+ "isAggregation": isAggregation,
+ "services": self.services
+ },
+ operationId=sectionOperationId,
+ userPrompt=userPrompt,
+ contentParts=extractedParts
+ )
+
+ try:
+ parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson))
+ if isinstance(parsedResponse, list):
+ generatedElements = parsedResponse
+ elif isinstance(parsedResponse, dict):
+ if "elements" in parsedResponse:
+ generatedElements = parsedResponse["elements"]
+ elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0:
+ firstSection = parsedResponse["sections"][0]
+ generatedElements = firstSection.get("elements", [])
+ elif parsedResponse.get("type"):
+ generatedElements = [parsedResponse]
+ else:
+ generatedElements = []
+ else:
+ generatedElements = []
+
+ class AiResponse:
+ def __init__(self, content):
+ self.content = content
+
+ aiResponse = AiResponse(aiResponseJson)
+ except Exception as parseError:
+ logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}")
+ class AiResponse:
+ def __init__(self, content):
+ self.content = content
+ aiResponse = AiResponse(aiResponseJson)
+ generatedElements = []
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
+ logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)")
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
+
+ # Process AI response
+ responseElements = await self._processAiResponseForSection(
+ aiResponse=aiResponse,
+ contentType=contentType,
+ operationType=operationType,
+ sectionId=sectionId,
+ generationHint=generationHint,
+ generatedElements=generatedElements
+ )
+ elements.extend(responseElements)
+
+ self.services.chat.progressLogFinish(sectionOperationId, True)
+
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed"
+ )
+
+ except Exception as e:
+ self.services.chat.progressLogFinish(sectionOperationId, False)
+ elements.append({
+ "type": "error",
+ "message": f"Error generating section {sectionId}: {str(e)}",
+ "sectionId": sectionId
+ })
+ logger.error(f"Error generating section {sectionId}: {str(e)}")
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
+ )
+
+ else:
+ # Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts
+ if len(contentPartIds) == 0 and useAiCall and generationHint:
+ # Generate content from scratch using only generationHint
+ logger.debug(f"Processing section {sectionId}: No content parts, generating from generationHint only")
+ generationPrompt = self._buildSectionGenerationPrompt(
+ section=section,
+ contentParts=[],
+ userPrompt=userPrompt,
+ generationHint=generationHint,
+ allSections=all_sections_list,
+ sectionIndex=sectionIndex,
+ isAggregation=False,
+ language=language
+ )
+
+ sectionOperationId = f"{fillOperationId}_section_{sectionId}"
+ self.services.chat.progressLogStart(
+ sectionOperationId,
+ "Section Generation",
+ f"Section {sectionIndex + 1}/{totalSections}",
+ f"{sectionTitle} (from generationHint)",
+ parentOperationId=chapterOperationId
+ )
+
+ try:
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+ logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
+
+ operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE
+
+ if operationType == OperationTypeEnum.IMAGE_GENERATE:
+ maxPromptLength = 4000
+ if len(generationPrompt) > maxPromptLength:
+ logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
+ generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+
+ request = AiCallRequest(
+ prompt=generationPrompt,
+ contentParts=[],
+ options=AiCallOptions(
+ operationType=operationType,
+ priority=PriorityEnum.BALANCED,
+ processingMode=ProcessingModeEnum.DETAILED
+ )
+ )
+ aiResponse = await self.aiService.callAi(request)
+ generatedElements = []
+ else:
+ isAggregation = False
+
+ async def buildSectionPromptWithContinuation(
+ section: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ generationHint: str,
+ allSections: List[Dict[str, Any]],
+ sectionIndex: int,
+ isAggregation: bool,
+ continuationContext: Dict[str, Any],
+ services: Any
+ ) -> str:
+ basePrompt = self._buildSectionGenerationPrompt(
+ section=section,
+ contentParts=contentParts,
+ userPrompt=userPrompt,
+ generationHint=generationHint,
+ allSections=allSections,
+ sectionIndex=sectionIndex,
+ isAggregation=isAggregation,
+ language=language
+ )
+
+ continuationInfo = continuationContext.get("delivered_summary", "")
+ cutOffElement = continuationContext.get("cut_off_element", "")
+
+ continuationPrompt = f"""{basePrompt}
+
+--- CONTINUATION REQUEST ---
+The previous JSON response was incomplete. Please continue from where it stopped.
+
+PREVIOUSLY DELIVERED SUMMARY:
+{continuationInfo}
+
+LAST INCOMPLETE ELEMENT:
+{cutOffElement}
+
+TASK: Continue generating the JSON elements array from where it was cut off.
+Complete the incomplete element and continue with remaining elements.
+
+Return ONLY the continuation JSON (starting from the incomplete element).
+The JSON should be a fragment that can be merged with the previous response."""
+ return continuationPrompt
+
+ options = AiCallOptions(
+ operationType=operationType,
+ priority=PriorityEnum.BALANCED,
+ processingMode=ProcessingModeEnum.DETAILED
+ )
+
+ aiResponseJson = await self.aiService._callAiWithLooping(
+ prompt=generationPrompt,
+ options=options,
+ debugPrefix=f"{chapterId}_section_{sectionId}",
+ promptBuilder=buildSectionPromptWithContinuation,
+ promptArgs={
+ "section": section,
+ "contentParts": [],
+ "userPrompt": userPrompt,
+ "generationHint": generationHint,
+ "allSections": all_sections_list,
+ "sectionIndex": sectionIndex,
+ "isAggregation": isAggregation,
+ "services": self.services
+ },
+ operationId=sectionOperationId,
+ userPrompt=userPrompt,
+ contentParts=[]
+ )
+
+ try:
+ parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson))
+ if isinstance(parsedResponse, list):
+ generatedElements = parsedResponse
+ elif isinstance(parsedResponse, dict):
+ if "elements" in parsedResponse:
+ generatedElements = parsedResponse["elements"]
+ elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0:
+ firstSection = parsedResponse["sections"][0]
+ generatedElements = firstSection.get("elements", [])
+ elif parsedResponse.get("type"):
+ generatedElements = [parsedResponse]
+ else:
+ generatedElements = []
+ else:
+ generatedElements = []
+
+ class AiResponse:
+ def __init__(self, content):
+ self.content = content
+
+ aiResponse = AiResponse(aiResponseJson)
+ except Exception as parseError:
+ logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}")
+ class AiResponse:
+ def __init__(self, content):
+ self.content = content
+ aiResponse = AiResponse(aiResponseJson)
+ generatedElements = []
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
+ logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
+
+ responseElements = await self._processAiResponseForSection(
+ aiResponse=aiResponse,
+ contentType=contentType,
+ operationType=operationType,
+ sectionId=sectionId,
+ generationHint=generationHint,
+ generatedElements=generatedElements
+ )
+ elements.extend(responseElements)
+
+ self.services.chat.progressLogFinish(sectionOperationId, True)
+
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed"
+ )
+
+ except Exception as e:
+ self.services.chat.progressLogFinish(sectionOperationId, False)
+ elements.append({
+ "type": "error",
+ "message": f"Error generating section {sectionId}: {str(e)}",
+ "sectionId": sectionId
+ })
+ logger.error(f"Error generating section {sectionId}: {str(e)}")
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
+ )
+
+ # Einzelverarbeitung: Jeder Part einzeln
+ for partId in contentPartIds:
+ part = self._findContentPartById(partId, contentParts)
+ if not part:
+ continue
+
+ contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat"))
+
+ if contentFormat == "reference":
+ elements.append({
+ "type": "reference",
+ "documentReference": part.metadata.get("documentReference"),
+ "label": part.metadata.get("usageHint", part.label)
+ })
+
+ elif contentFormat == "object":
+ if part.typeGroup == "image":
+ elements.append({
+ "type": "image",
+ "content": {
+ "base64Data": part.data,
+ "altText": part.metadata.get("usageHint", part.label),
+ "caption": part.metadata.get("caption", "")
+ }
+ })
+ else:
+ elements.append({
+ "type": part.typeGroup,
+ "content": {
+ "data": part.data,
+ "mimeType": part.mimeType,
+ "label": part.metadata.get("usageHint", part.label)
+ }
+ })
+
+ elif contentFormat == "extracted":
+ if useAiCall and generationHint:
+ # AI-Call mit einzelnen ContentPart
+ logger.debug(f"Processing section {sectionId}: Single extracted part with AI call")
+ generationPrompt = self._buildSectionGenerationPrompt(
+ section=section,
+ contentParts=[part],
+ userPrompt=userPrompt,
+ generationHint=generationHint,
+ allSections=all_sections_list,
+ sectionIndex=sectionIndex,
+ isAggregation=False,
+ language=language
+ )
+
+ sectionOperationId = f"{fillOperationId}_section_{sectionId}"
+ self.services.chat.progressLogStart(
+ sectionOperationId,
+ "Section Generation",
+ f"Section {sectionIndex + 1}/{totalSections}",
+ f"{sectionTitle} (single part)",
+ parentOperationId=chapterOperationId
+ )
+
+ try:
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+ logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
+
+ operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE
+
+ if operationType == OperationTypeEnum.IMAGE_GENERATE:
+ maxPromptLength = 4000
+ if len(generationPrompt) > maxPromptLength:
+ logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
+ generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+
+ request = AiCallRequest(
+ prompt=generationPrompt,
+ contentParts=[],
+ options=AiCallOptions(
+ operationType=operationType,
+ priority=PriorityEnum.BALANCED,
+ processingMode=ProcessingModeEnum.DETAILED
+ )
+ )
+ aiResponse = await self.aiService.callAi(request)
+ generatedElements = []
+ else:
+ isAggregation = False
+
+ async def buildSectionPromptWithContinuation(
+ section: Dict[str, Any],
+ contentParts: List[ContentPart],
+ userPrompt: str,
+ generationHint: str,
+ allSections: List[Dict[str, Any]],
+ sectionIndex: int,
+ isAggregation: bool,
+ continuationContext: Dict[str, Any],
+ services: Any
+ ) -> str:
+ basePrompt = self._buildSectionGenerationPrompt(
+ section=section,
+ contentParts=contentParts,
+ userPrompt=userPrompt,
+ generationHint=generationHint,
+ allSections=allSections,
+ sectionIndex=sectionIndex,
+ isAggregation=isAggregation,
+ language=language
+ )
+
+ continuationInfo = continuationContext.get("delivered_summary", "")
+ cutOffElement = continuationContext.get("cut_off_element", "")
+
+ continuationPrompt = f"""{basePrompt}
+
+--- CONTINUATION REQUEST ---
+The previous JSON response was incomplete. Please continue from where it stopped.
+
+PREVIOUSLY DELIVERED SUMMARY:
+{continuationInfo}
+
+LAST INCOMPLETE ELEMENT:
+{cutOffElement}
+
+TASK: Continue generating the JSON elements array from where it was cut off.
+Complete the incomplete element and continue with remaining elements.
+
+Return ONLY the continuation JSON (starting from the incomplete element).
+The JSON should be a fragment that can be merged with the previous response."""
+ return continuationPrompt
+
+ options = AiCallOptions(
+ operationType=operationType,
+ priority=PriorityEnum.BALANCED,
+ processingMode=ProcessingModeEnum.DETAILED
+ )
+
+ aiResponseJson = await self.aiService._callAiWithLooping(
+ prompt=generationPrompt,
+ options=options,
+ debugPrefix=f"{chapterId}_section_{sectionId}",
+ promptBuilder=buildSectionPromptWithContinuation,
+ promptArgs={
+ "section": section,
+ "contentParts": [part],
+ "userPrompt": userPrompt,
+ "generationHint": generationHint,
+ "allSections": all_sections_list,
+ "sectionIndex": sectionIndex,
+ "isAggregation": isAggregation,
+ "services": self.services
+ },
+ operationId=sectionOperationId,
+ userPrompt=userPrompt,
+ contentParts=[part]
+ )
+
+ try:
+ parsedResponse = json.loads(self.services.utils.jsonExtractString(aiResponseJson))
+ if isinstance(parsedResponse, list):
+ generatedElements = parsedResponse
+ elif isinstance(parsedResponse, dict):
+ if "elements" in parsedResponse:
+ generatedElements = parsedResponse["elements"]
+ elif "sections" in parsedResponse and len(parsedResponse["sections"]) > 0:
+ firstSection = parsedResponse["sections"][0]
+ generatedElements = firstSection.get("elements", [])
+ elif parsedResponse.get("type"):
+ generatedElements = [parsedResponse]
+ else:
+ generatedElements = []
+ else:
+ generatedElements = []
+
+ class AiResponse:
+ def __init__(self, content):
+ self.content = content
+
+ aiResponse = AiResponse(aiResponseJson)
+ except Exception as parseError:
+ logger.error(f"Error parsing response from _callAiWithLooping for section {sectionId}: {str(parseError)}")
+ class AiResponse:
+ def __init__(self, content):
+ self.content = content
+ aiResponse = AiResponse(aiResponseJson)
+ generatedElements = []
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
+ logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
+
+ self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
+
+ responseElements = await self._processAiResponseForSection(
+ aiResponse=aiResponse,
+ contentType=contentType,
+ operationType=operationType,
+ sectionId=sectionId,
+ generationHint=generationHint,
+ generatedElements=generatedElements
+ )
+ elements.extend(responseElements)
+
+ self.services.chat.progressLogFinish(sectionOperationId, True)
+
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed"
+ )
+
+ except Exception as e:
+ self.services.chat.progressLogFinish(sectionOperationId, False)
+ elements.append({
+ "type": "error",
+ "message": f"Error generating section {sectionId}: {str(e)}",
+ "sectionId": sectionId
+ })
+ logger.error(f"Error generating section {sectionId}: {str(e)}")
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
+ )
+ else:
+ # Füge extrahierten Content direkt hinzu (kein AI-Call)
+ if part.typeGroup == "image":
+ logger.debug(f"Processing section {sectionId}: Single extracted IMAGE part WITHOUT AI call")
+ elements.append({
+ "type": "image",
+ "content": {
+ "base64Data": part.data,
+ "altText": part.metadata.get("usageHint", part.label),
+ "caption": part.metadata.get("caption", "")
+ }
+ })
+ else:
+ logger.debug(f"Processing section {sectionId}: Single extracted TEXT part WITHOUT AI call")
+ elements.append({
+ "type": "extracted_text",
+ "content": part.data,
+ "source": part.metadata.get("documentId"),
+ "extractionPrompt": part.metadata.get("extractionPrompt")
+ })
+
+ # Update progress after section completion
+ chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
+ self.services.chat.progressLogUpdate(
+ chapterOperationId,
+ chapterProgress,
+ f"Section {sectionIndex + 1}/{totalSections} completed"
+ )
+
+ overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex + 1, totalSections)
+ self.services.chat.progressLogUpdate(
+ fillOperationId,
+ overallProgress,
+ f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections} completed"
+ )
+
+ except Exception as e:
+ logger.error(f"Unexpected error processing section {sectionId}: {str(e)}")
+ elements.append({
+ "type": "error",
+ "message": f"Unexpected error processing section {sectionId}: {str(e)}",
+ "sectionId": sectionId
+ })
+
+ return elements
+
async def _fillChapterSections(
self,
chapterStructure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
- parentOperationId: str
+ parentOperationId: str,
+ language: str
) -> Dict[str, Any]:
"""
Phase 5D.2: Füllt Sections mit ContentParts.
"""
+
# Sammle alle Sections für Kontext-Informationen (für alle Sections)
all_sections_list = []
for doc in chapterStructure.get("documents", []):
@@ -252,768 +1242,48 @@ class StructureFiller:
parentOperationId=fillOperationId
)
- # Process sections within chapter
+ # Process sections within chapter in parallel
+ sectionTasks = []
for sectionIndex, section in enumerate(sections):
- sectionId = section.get("id")
- sectionTitle = section.get("title", sectionId)
- contentPartIds = section.get("contentPartIds", [])
- contentFormats = section.get("contentFormats", {})
- # Check both camelCase and snake_case for generationHint
- generationHint = section.get("generationHint") or section.get("generation_hint")
- contentType = section.get("content_type", "paragraph")
- useAiCall = section.get("useAiCall", False)
-
- # Update overall progress at start of section
- overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex, totalSections)
- self.services.chat.progressLogUpdate(
- fillOperationId,
- overallProgress,
- f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections}: {sectionTitle}"
+ # Create task for parallel processing
+ task = self._processSingleSection(
+ section=section,
+ sectionIndex=sectionIndex,
+ totalSections=totalSections,
+ chapterIndex=chapterIndex,
+ totalChapters=totalChapters,
+ chapterId=chapterId,
+ chapterOperationId=chapterOperationId,
+ fillOperationId=fillOperationId,
+ contentParts=contentParts,
+ userPrompt=userPrompt,
+ all_sections_list=all_sections_list,
+ language=language,
+ calculateOverallProgress=calculateOverallProgress
)
+ sectionTasks.append((sectionIndex, section, task))
+
+ # Execute all section tasks in parallel
+ if sectionTasks:
+ # Create list of tasks (without indices for gather)
+ tasks = [task for _, _, task in sectionTasks]
- # WICHTIG: Wenn keine ContentParts vorhanden sind UND kein generationHint, kann kein AI-Call gemacht werden
- # Aber: Wenn generationHint vorhanden ist, SOLLTE AI verwendet werden, auch wenn useAiCall=false gesetzt ist
- # (z.B. wenn AI die Struktur generiert hat, aber useAiCall falsch gesetzt wurde)
- if len(contentPartIds) == 0 and not generationHint:
- useAiCall = False
- logger.debug(f"Section {sectionId}: No content parts and no generation hint, setting useAiCall=False")
- elif len(contentPartIds) == 0 and generationHint and not useAiCall:
- # Override: If there's a generationHint but no content parts, we should use AI
- # This handles cases where structure generation set useAiCall=false incorrectly
- useAiCall = True
- logger.info(f"Section {sectionId}: Overriding useAiCall=True (has generationHint but no content parts)")
+ # Execute in parallel with error handling
+ results = await asyncio.gather(*tasks, return_exceptions=True)
- elements = []
-
- # Prüfe ob Aggregation nötig ist
- needsAggregation = self._needsAggregation(
- contentType=contentType,
- contentPartCount=len(contentPartIds)
- )
-
- logger.info(f"Processing section {sectionId}: contentType={contentType}, contentPartCount={len(contentPartIds)}, useAiCall={useAiCall}, needsAggregation={needsAggregation}, hasGenerationHint={bool(generationHint)}")
-
- if needsAggregation and useAiCall:
- # Aggregation: Alle Parts zusammen verarbeiten
- sectionParts = [
- self._findContentPartById(pid, contentParts)
- for pid in contentPartIds
- ]
- sectionParts = [p for p in sectionParts if p is not None]
-
- if sectionParts:
- # Filtere nur extracted Parts für Aggregation (reference/object werden separat behandelt)
- extractedParts = [
- p for p in sectionParts
- if contentFormats.get(p.id, p.metadata.get("contentFormat")) == "extracted"
- ]
- nonExtractedParts = [
- p for p in sectionParts
- if contentFormats.get(p.id, p.metadata.get("contentFormat")) != "extracted"
- ]
-
- # Verarbeite non-extracted Parts separat (reference, object)
- for part in nonExtractedParts:
- contentFormat = contentFormats.get(part.id, part.metadata.get("contentFormat"))
-
- if contentFormat == "reference":
- elements.append({
- "type": "reference",
- "documentReference": part.metadata.get("documentReference"),
- "label": part.metadata.get("usageHint", part.label)
- })
- elif contentFormat == "object":
- # Nested content structure for objects
- if part.typeGroup == "image":
- elements.append({
- "type": "image",
- "content": {
- "base64Data": part.data,
- "altText": part.metadata.get("usageHint", part.label),
- "caption": part.metadata.get("caption", "")
- }
- })
- else:
- elements.append({
- "type": part.typeGroup,
- "content": {
- "data": part.data,
- "mimeType": part.mimeType,
- "label": part.metadata.get("usageHint", part.label)
- }
- })
-
- # Aggregiere extracted Parts mit AI
- if extractedParts:
- logger.debug(f"Section {sectionId}: Aggregating {len(extractedParts)} extracted parts with AI")
- generationPrompt = self._buildSectionGenerationPrompt(
- section=section,
- contentParts=extractedParts, # ALLE PARTS für Aggregation!
- userPrompt=userPrompt,
- generationHint=generationHint,
- allSections=all_sections_list,
- sectionIndex=sectionIndex,
- isAggregation=True
- )
-
- # Erstelle Operation-ID für Section-Generierung
- sectionOperationId = f"{fillOperationId}_section_{sectionId}"
-
- # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId)
- self.services.chat.progressLogStart(
- sectionOperationId,
- "Section Generation (Aggregation)",
- f"Section {sectionIndex + 1}/{totalSections}",
- f"{sectionTitle} ({len(extractedParts)} parts)",
- parentOperationId=chapterOperationId
- )
-
- try:
- # Update: Building prompt
- self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
-
- # Debug: Log Prompt
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)")
-
- # Update: Calling AI
- self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
-
- # Verwende callAi für ContentParts-Unterstützung (nicht callAiPlanning!)
- # Use IMAGE_GENERATE for image content type
- operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE
-
- # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit)
- if operationType == OperationTypeEnum.IMAGE_GENERATE:
- maxPromptLength = 4000
- if len(generationPrompt) > maxPromptLength:
- logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
- # Keep the beginning (task, metadata, generation hint) and truncate from end
- generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline
-
- # For IMAGE_GENERATE, don't pass contentParts - image generation uses prompt only, not content chunks
- contentPartsForCall = [] if operationType == OperationTypeEnum.IMAGE_GENERATE else extractedParts
- request = AiCallRequest(
- prompt=generationPrompt,
- contentParts=contentPartsForCall, # Empty for IMAGE_GENERATE, all parts for others
- options=AiCallOptions(
- operationType=operationType,
- priority=PriorityEnum.BALANCED,
- processingMode=ProcessingModeEnum.DETAILED
- )
- )
- aiResponse = await self.aiService.callAi(request)
-
- # Update: Processing response
- self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
-
- # Debug: Log Response
- self.services.utils.writeDebugFile(
- aiResponse.content,
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)")
-
- # Update: Validating content
- self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
-
- # Handle IMAGE_GENERATE differently - returns image data directly
- if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE:
- import base64
- base64Data = ""
-
- # Convert image data to base64 string if needed
- if isinstance(aiResponse.content, bytes):
- base64Data = base64.b64encode(aiResponse.content).decode('utf-8')
- elif isinstance(aiResponse.content, str):
- # Check if it's already a JSON structure
- try:
- # Try to parse as JSON first
- jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content))
- # If it's already a proper JSON structure with image element, use it
- if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
- elements.append(jsonContent)
- logger.debug("AI returned proper JSON image structure")
- # Skip remaining image processing, but continue with progress updates
- base64Data = None # Signal that image was already processed
- elif isinstance(jsonContent, list) and len(jsonContent) > 0:
- # Check if first element is an image
- if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
- elements.extend(jsonContent)
- logger.debug("AI returned proper JSON image structure in list")
- # Skip remaining image processing, but continue with progress updates
- base64Data = None # Signal that image was already processed
- else:
- base64Data = "" # Continue with normal processing
- except (json.JSONDecodeError, ValueError, AttributeError):
- # Not JSON, treat as base64 string or data URI
- base64Data = "" # Will be processed below
-
- # Process base64 if not already handled above
- if base64Data is None:
- # Already processed as JSON, skip base64 processing
- pass
- elif aiResponse.content.startswith("data:image/"):
- # Extract base64 from data URI
- base64Data = aiResponse.content.split(",", 1)[1]
- else:
- # Check if it looks like base64 (alphanumeric + / + =)
- content_stripped = aiResponse.content.strip()
- if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]):
- # Looks like base64, use it
- base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "")
- else:
- base64Data = aiResponse.content
- else:
- base64Data = ""
-
- # Always create proper JSON structure for images (if not already processed)
- if base64Data is None:
- # Image already processed as JSON, skip
- pass
- elif base64Data:
- elements.append({
- "type": "image",
- "content": {
- "base64Data": base64Data,
- "altText": generationHint or "Generated image",
- "caption": ""
- }
- })
- logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}")
- else:
- logger.warning(f"IMAGE_GENERATE returned empty or invalid content for section {sectionId}")
- elements.append({
- "type": "error",
- "message": f"Image generation returned empty or invalid content",
- "sectionId": sectionId
- })
- else:
- # Parse JSON response for other content types
- try:
- generatedElements = json.loads(
- self.services.utils.jsonExtractString(aiResponse.content)
- )
- if isinstance(generatedElements, list):
- elements.extend(generatedElements)
- elif isinstance(generatedElements, dict) and "elements" in generatedElements:
- elements.extend(generatedElements["elements"])
- elif isinstance(generatedElements, dict) and generatedElements.get("type"):
- # Single element in dict format
- elements.append(generatedElements)
- except (json.JSONDecodeError, ValueError) as json_error:
- logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}")
- # Try to extract any image data that might be in the response
- if contentType == "image":
- # Check if response content might be base64 image data
- content_str = str(aiResponse.content)
- if len(content_str) > 100:
- elements.append({
- "type": "error",
- "message": f"Failed to parse image generation response: {str(json_error)}",
- "sectionId": sectionId
- })
- else:
- elements.append({
- "type": "error",
- "message": f"Failed to parse JSON response: {str(json_error)}",
- "sectionId": sectionId
- })
-
- # ChatLog abschließen
- self.services.chat.progressLogFinish(sectionOperationId, True)
-
- # Update chapter progress after section completion
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed"
- )
-
- except Exception as e:
- # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!)
- self.services.chat.progressLogFinish(sectionOperationId, False)
- elements.append({
- "type": "error",
- "message": f"Error generating section {sectionId}: {str(e)}",
- "sectionId": sectionId
- })
- logger.error(f"Error generating section {sectionId}: {str(e)}")
- # Still update chapter progress even on error
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
- )
- # NICHT raise - Section wird mit Fehlermeldung gerendert
-
- else:
- # Einzelverarbeitung: Jeder Part einzeln ODER Generation ohne ContentParts
- # Handle case where no content parts but generationHint exists (e.g., Executive Summary)
- if len(contentPartIds) == 0 and useAiCall and generationHint:
- # Generate content from scratch using only generationHint
- logger.debug(f"Processing section {sectionId}: No content parts, generating from generationHint only")
- generationPrompt = self._buildSectionGenerationPrompt(
- section=section,
- contentParts=[], # NO PARTS
- userPrompt=userPrompt,
- generationHint=generationHint,
- allSections=all_sections_list,
- sectionIndex=sectionIndex,
- isAggregation=False
- )
-
- # Erstelle Operation-ID für Section-Generierung
- sectionOperationId = f"{fillOperationId}_section_{sectionId}"
-
- # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId)
- self.services.chat.progressLogStart(
- sectionOperationId,
- "Section Generation",
- f"Section {sectionIndex + 1}/{totalSections}",
- f"{sectionTitle} (from generationHint)",
- parentOperationId=chapterOperationId
- )
-
- try:
- # Update: Building prompt
- self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
-
- # Debug: Log Prompt
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
-
- # Update: Calling AI
- self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
-
- # Verwende callAi ohne ContentParts
- operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE
-
- # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit)
- if operationType == OperationTypeEnum.IMAGE_GENERATE:
- maxPromptLength = 4000
- if len(generationPrompt) > maxPromptLength:
- logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
- # Keep the beginning (task, metadata, generation hint) and truncate from end
- generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline
-
- request = AiCallRequest(
- prompt=generationPrompt,
- contentParts=[], # NO PARTS
- options=AiCallOptions(
- operationType=operationType,
- priority=PriorityEnum.BALANCED,
- processingMode=ProcessingModeEnum.DETAILED
- )
- )
- aiResponse = await self.aiService.callAi(request)
-
- # Update: Processing response
- self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
-
- # Debug: Log Response
- self.services.utils.writeDebugFile(
- aiResponse.content,
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
-
- # Update: Validating content
- self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
-
- # Handle IMAGE_GENERATE differently - returns image data directly
- if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE:
- import base64
- base64Data = ""
-
- # Convert image data to base64 string if needed
- if isinstance(aiResponse.content, bytes):
- base64Data = base64.b64encode(aiResponse.content).decode('utf-8')
- elif isinstance(aiResponse.content, str):
- # Check if it's already a JSON structure
- try:
- jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content))
- if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
- elements.append(jsonContent)
- logger.debug("AI returned proper JSON image structure")
- # Skip remaining image processing, but continue with progress updates
- base64Data = None # Signal that image was already processed
- elif isinstance(jsonContent, list) and len(jsonContent) > 0:
- if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
- elements.extend(jsonContent)
- logger.debug("AI returned proper JSON image structure in list")
- # Skip remaining image processing, but continue with progress updates
- base64Data = None # Signal that image was already processed
- else:
- base64Data = "" # Continue with normal processing
- except (json.JSONDecodeError, ValueError, AttributeError):
- base64Data = "" # Will be processed below
-
- # Process base64 if not already handled above
- if base64Data is None:
- # Already processed as JSON, skip base64 processing
- pass
- elif aiResponse.content.startswith("data:image/"):
- # Extract base64 from data URI
- base64Data = aiResponse.content.split(",", 1)[1]
- else:
- content_stripped = aiResponse.content.strip()
- if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]):
- base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "")
- else:
- base64Data = aiResponse.content
- else:
- base64Data = ""
-
- # Always create proper JSON structure for images (if not already processed)
- if base64Data is None:
- # Image already processed as JSON, skip
- pass
- elif base64Data:
- elements.append({
- "type": "image",
- "content": {
- "base64Data": base64Data,
- "altText": generationHint or "Generated image",
- "caption": ""
- }
- })
- logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}")
- else:
- logger.warning(f"IMAGE_GENERATE returned empty content for section {sectionId}")
- elements.append({
- "type": "error",
- "message": f"Image generation returned empty content",
- "sectionId": sectionId
- })
- else:
- # Parse JSON response for other content types
- try:
- generatedElements = json.loads(
- self.services.utils.jsonExtractString(aiResponse.content)
- )
- if isinstance(generatedElements, list):
- elements.extend(generatedElements)
- elif isinstance(generatedElements, dict) and "elements" in generatedElements:
- elements.extend(generatedElements["elements"])
- elif isinstance(generatedElements, dict) and generatedElements.get("type"):
- elements.append(generatedElements)
- except (json.JSONDecodeError, ValueError) as json_error:
- logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}")
- elements.append({
- "type": "error",
- "message": f"Failed to parse JSON response: {str(json_error)}",
- "sectionId": sectionId
- })
-
- # ChatLog abschließen
- self.services.chat.progressLogFinish(sectionOperationId, True)
-
- # Update chapter progress after section completion
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed"
- )
-
- except Exception as e:
- # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!)
- self.services.chat.progressLogFinish(sectionOperationId, False)
- elements.append({
- "type": "error",
- "message": f"Error generating section {sectionId}: {str(e)}",
- "sectionId": sectionId
- })
- logger.error(f"Error generating section {sectionId}: {str(e)}")
- # Still update chapter progress even on error
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
- )
-
- # Einzelverarbeitung: Jeder Part einzeln
- for partId in contentPartIds:
- part = self._findContentPartById(partId, contentParts)
- if not part:
- continue
-
- contentFormat = contentFormats.get(partId, part.metadata.get("contentFormat"))
-
- if contentFormat == "reference":
- # Füge Dokument-Referenz hinzu
- elements.append({
- "type": "reference",
- "documentReference": part.metadata.get("documentReference"),
- "label": part.metadata.get("usageHint", part.label)
- })
-
- elif contentFormat == "object":
- # Füge base64 Object hinzu (nested in content structure)
- if part.typeGroup == "image":
- elements.append({
- "type": "image",
- "content": {
- "base64Data": part.data,
- "altText": part.metadata.get("usageHint", part.label),
- "caption": part.metadata.get("caption", "")
- }
- })
- else:
- # For other object types, use generic structure
- elements.append({
- "type": part.typeGroup,
- "content": {
- "data": part.data,
- "mimeType": part.mimeType,
- "label": part.metadata.get("usageHint", part.label)
- }
- })
-
- elif contentFormat == "extracted":
- # WICHTIG: Prüfe sowohl useAiCall als auch generationHint
- if useAiCall and generationHint:
- # AI-Call mit einzelnen ContentPart
- logger.debug(f"Processing section {sectionId}: Single extracted part with AI call (useAiCall={useAiCall}, generationHint={bool(generationHint)})")
- generationPrompt = self._buildSectionGenerationPrompt(
- section=section,
- contentParts=[part], # EIN PART
- userPrompt=userPrompt,
- generationHint=generationHint,
- allSections=all_sections_list,
- sectionIndex=sectionIndex,
- isAggregation=False
- )
-
- # Erstelle Operation-ID für Section-Generierung
- sectionOperationId = f"{fillOperationId}_section_{sectionId}"
-
- # Starte ChatLog mit Parent-Referenz (chapter, not fillOperationId)
- self.services.chat.progressLogStart(
- sectionOperationId,
- "Section Generation",
- f"Section {sectionIndex + 1}/{totalSections}",
- f"{sectionTitle} (single part)",
- parentOperationId=chapterOperationId
- )
-
- try:
- # Update: Building prompt
- self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
-
- # Debug: Log Prompt
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
-
- # Update: Calling AI
- self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
-
- # Verwende callAi für ContentParts-Unterstützung
- # Use IMAGE_GENERATE for image content type
- operationType = OperationTypeEnum.IMAGE_GENERATE if contentType == "image" else OperationTypeEnum.DATA_ANALYSE
-
- # For IMAGE_GENERATE, truncate prompt to 4000 chars (DALL-E limit)
- if operationType == OperationTypeEnum.IMAGE_GENERATE:
- maxPromptLength = 4000
- if len(generationPrompt) > maxPromptLength:
- logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
- # Keep the beginning (task, metadata, generation hint) and truncate from end
- generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0] # Truncate at last newline
-
- # For IMAGE_GENERATE, don't pass contentParts - image generation uses prompt only, not content chunks
- contentPartsForCall = [] if operationType == OperationTypeEnum.IMAGE_GENERATE else [part]
- request = AiCallRequest(
- prompt=generationPrompt,
- contentParts=contentPartsForCall,
- options=AiCallOptions(
- operationType=operationType,
- priority=PriorityEnum.BALANCED,
- processingMode=ProcessingModeEnum.DETAILED
- )
- )
- aiResponse = await self.aiService.callAi(request)
-
- # Update: Processing response
- self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
-
- # Debug: Log Response
- self.services.utils.writeDebugFile(
- aiResponse.content,
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
-
- # Update: Validating content
- self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
-
- # Handle IMAGE_GENERATE differently - returns image data directly
- if contentType == "image" and operationType == OperationTypeEnum.IMAGE_GENERATE:
- import base64
- base64Data = ""
-
- # Convert image data to base64 string if needed
- if isinstance(aiResponse.content, bytes):
- base64Data = base64.b64encode(aiResponse.content).decode('utf-8')
- elif isinstance(aiResponse.content, str):
- # Check if it's already a JSON structure
- try:
- jsonContent = json.loads(self.services.utils.jsonExtractString(aiResponse.content))
- if isinstance(jsonContent, dict) and jsonContent.get("type") == "image":
- elements.append(jsonContent)
- logger.debug("AI returned proper JSON image structure")
- # Skip remaining image processing, but continue with progress updates
- base64Data = None # Signal that image was already processed
- elif isinstance(jsonContent, list) and len(jsonContent) > 0:
- if isinstance(jsonContent[0], dict) and jsonContent[0].get("type") == "image":
- elements.extend(jsonContent)
- logger.debug("AI returned proper JSON image structure in list")
- # Skip remaining image processing, but continue with progress updates
- base64Data = None # Signal that image was already processed
- else:
- base64Data = "" # Continue with normal processing
- except (json.JSONDecodeError, ValueError, AttributeError):
- base64Data = "" # Will be processed below
-
- # Process base64 if not already handled above
- if base64Data is None:
- # Already processed as JSON, skip base64 processing
- pass
- elif aiResponse.content.startswith("data:image/"):
- # Extract base64 from data URI
- base64Data = aiResponse.content.split(",", 1)[1]
- else:
- content_stripped = aiResponse.content.strip()
- if len(content_stripped) > 100 and all(c in "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=\n\r\t " for c in content_stripped[:200]):
- base64Data = content_stripped.replace("\n", "").replace("\r", "").replace("\t", "").replace(" ", "")
- else:
- base64Data = aiResponse.content
- else:
- base64Data = ""
-
- # Always create proper JSON structure for images (if not already processed)
- if base64Data is None:
- # Image already processed as JSON, skip
- pass
- elif base64Data:
- elements.append({
- "type": "image",
- "content": {
- "base64Data": base64Data,
- "altText": generationHint or "Generated image",
- "caption": ""
- }
- })
- logger.debug(f"Created proper JSON image structure with base64Data length: {len(base64Data)}")
- else:
- logger.warning(f"IMAGE_GENERATE returned empty content for section {sectionId}")
- elements.append({
- "type": "error",
- "message": f"Image generation returned empty content",
- "sectionId": sectionId
- })
- else:
- # Parse JSON response for other content types
- try:
- generatedElements = json.loads(
- self.services.utils.jsonExtractString(aiResponse.content)
- )
- if isinstance(generatedElements, list):
- elements.extend(generatedElements)
- elif isinstance(generatedElements, dict) and "elements" in generatedElements:
- elements.extend(generatedElements["elements"])
- elif isinstance(generatedElements, dict) and generatedElements.get("type"):
- elements.append(generatedElements)
- except (json.JSONDecodeError, ValueError) as json_error:
- logger.error(f"Error parsing JSON response for section {sectionId}: {str(json_error)}")
- elements.append({
- "type": "error",
- "message": f"Failed to parse JSON response: {str(json_error)}",
- "sectionId": sectionId
- })
-
- # ChatLog abschließen
- self.services.chat.progressLogFinish(sectionOperationId, True)
-
- # Update chapter progress after section completion
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed"
- )
-
- except Exception as e:
- # Fehlerhafte Section mit Fehlermeldung rendern (kein Abbruch!)
- self.services.chat.progressLogFinish(sectionOperationId, False)
- elements.append({
- "type": "error",
- "message": f"Error generating section {sectionId}: {str(e)}",
- "sectionId": sectionId
- })
- logger.error(f"Error generating section {sectionId}: {str(e)}")
- # Still update chapter progress even on error
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed (with errors)"
- )
- # NICHT raise - Section wird mit Fehlermeldung gerendert
- else:
- # Füge extrahierten Content direkt hinzu (kein AI-Call)
- # CRITICAL: Check part typeGroup to determine correct element type
- if part.typeGroup == "image":
- # Image content should be added as image element, not extracted_text
- logger.debug(f"Processing section {sectionId}: Single extracted IMAGE part WITHOUT AI call - adding as image element")
- elements.append({
- "type": "image",
- "content": {
- "base64Data": part.data,
- "altText": part.metadata.get("usageHint", part.label),
- "caption": part.metadata.get("caption", "")
- }
- })
- else:
- # Text content - add as extracted_text element
- logger.debug(f"Processing section {sectionId}: Single extracted TEXT part WITHOUT AI call (useAiCall={useAiCall}, generationHint={bool(generationHint)}) - adding extracted text directly")
- elements.append({
- "type": "extracted_text",
- "content": part.data,
- "source": part.metadata.get("documentId"),
- "extractionPrompt": part.metadata.get("extractionPrompt")
- })
-
- # Assign elements to section (for all processing paths)
- section["elements"] = elements
-
- # Update chapter progress after section completion (for all sections, including non-AI)
- chapterProgress = (sectionIndex + 1) / totalSections if totalSections > 0 else 1.0
- self.services.chat.progressLogUpdate(
- chapterOperationId,
- chapterProgress,
- f"Section {sectionIndex + 1}/{totalSections} completed"
- )
-
- # Update overall progress after section completion
- overallProgress = calculateOverallProgress(chapterIndex - 1, totalChapters, sectionIndex + 1, totalSections)
- self.services.chat.progressLogUpdate(
- fillOperationId,
- overallProgress,
- f"Chapter {chapterIndex}/{totalChapters}, Section {sectionIndex + 1}/{totalSections} completed"
- )
+ # Process results in order and assign elements to sections
+ for (originalIndex, originalSection, _), result in zip(sectionTasks, results):
+ if isinstance(result, Exception):
+ logger.error(f"Error processing section {originalSection.get('id')}: {str(result)}")
+ # Set error element
+ originalSection["elements"] = [{
+ "type": "error",
+ "message": f"Error processing section: {str(result)}",
+ "sectionId": originalSection.get("id")
+ }]
+ else:
+ # Assign elements to section in correct order
+ originalSection["elements"] = result
# Finish chapter operation after all sections processed
self.services.chat.progressLogFinish(chapterOperationId, True)
@@ -1153,7 +1423,8 @@ class StructureFiller:
contentPartIds: List[str],
contentPartInstructions: Dict[str, Any],
contentParts: List[ContentPart],
- userPrompt: str
+ userPrompt: str,
+ language: str = "en"
) -> str:
"""Baue Prompt für Chapter-Sections-Struktur-Generierung."""
# Baue ContentParts-Index (nur IDs, keine Previews!)
@@ -1176,6 +1447,8 @@ class StructureFiller:
prompt = f"""TASK: Generate Chapter Sections Structure
+LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}.
+
CHAPTER: {chapterTitle} (Level {chapterLevel}, ID: {chapterId})
GENERATION HINT: {generationHint}
@@ -1245,7 +1518,8 @@ CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside th
generationHint: str,
allSections: Optional[List[Dict[str, Any]]] = None,
sectionIndex: Optional[int] = None,
- isAggregation: bool = False
+ isAggregation: bool = False,
+ language: str = "en"
) -> str:
"""Baue Prompt für Section-Generierung mit vollständigem Kontext."""
# Filtere None-Werte
@@ -1344,6 +1618,8 @@ CRITICAL: Return ONLY valid JSON. Do not include any explanatory text outside th
if isAggregation:
prompt = f"""# TASK: Generate Section Content (Aggregation)
+LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}.
+
## SECTION METADATA
- Section ID: {sectionId}
- Content Type: {contentType}
@@ -1392,6 +1668,8 @@ CRITICAL:
else:
prompt = f"""# TASK: Generate Section Content
+LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}.
+
## SECTION METADATA
- Section ID: {sectionId}
- Content Type: {contentType}
diff --git a/modules/services/serviceAi/subStructureGeneration.py b/modules/services/serviceAi/subStructureGeneration.py
index d3b46e0e..f4a26b5b 100644
--- a/modules/services/serviceAi/subStructureGeneration.py
+++ b/modules/services/serviceAi/subStructureGeneration.py
@@ -76,7 +76,30 @@ class StructureGenerator:
)
# Parse Struktur
- structure = json.loads(self.services.utils.jsonExtractString(aiResponse))
+ # Use tryParseJson which handles malformed JSON and unterminated strings
+ extractedJson = self.services.utils.jsonExtractString(aiResponse)
+ parsedJson, parseError, cleanedJson = self.services.utils.jsonTryParse(extractedJson)
+
+ if parseError is not None:
+ # Try to repair broken JSON (handles unterminated strings, incomplete structures, etc.)
+ logger.warning(f"Initial JSON parsing failed: {str(parseError)}. Attempting repair...")
+ from modules.shared import jsonUtils
+ repairedJson = jsonUtils.repairBrokenJson(extractedJson)
+ if repairedJson:
+ # Try parsing repaired JSON
+ parsedJson, parseError, _ = self.services.utils.jsonTryParse(json.dumps(repairedJson))
+ if parseError is None:
+ logger.info("Successfully repaired and parsed JSON structure")
+ structure = parsedJson
+ else:
+ logger.error(f"Failed to parse repaired JSON: {str(parseError)}")
+ raise ValueError(f"Failed to parse JSON structure after repair: {str(parseError)}")
+ else:
+ logger.error(f"Failed to repair JSON. Parse error: {str(parseError)}")
+ logger.error(f"Cleaned JSON preview (first 500 chars): {cleanedJson[:500]}")
+ raise ValueError(f"Failed to parse JSON structure: {str(parseError)}")
+ else:
+ structure = parsedJson
# ChatLog abschließen
self.services.chat.progressLogFinish(structureOperationId, True)
@@ -145,11 +168,17 @@ class StructureGenerator:
if not contentPartsIndex:
contentPartsIndex = "\n(No content parts available)"
+ # Extract language from user prompt or default to "de" (can be detected from userPrompt)
+ # For now, default to "de" - can be enhanced with language detection later
+ language = "en" # Default language
+
prompt = f"""USER REQUEST (for context):
```
{userPrompt}
```
+LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}.
+
AVAILABLE CONTENT PARTS:
{contentPartsIndex}
From 0d77263fb7cd37e3314d663651084f156bc035bf Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 30 Dec 2025 02:13:18 +0100
Subject: [PATCH 3/5] fixed language of prompt
---
.../serviceAi/subStructureGeneration.py | 22 +++++++++++++++----
1 file changed, 18 insertions(+), 4 deletions(-)
diff --git a/modules/services/serviceAi/subStructureGeneration.py b/modules/services/serviceAi/subStructureGeneration.py
index f4a26b5b..bee83706 100644
--- a/modules/services/serviceAi/subStructureGeneration.py
+++ b/modules/services/serviceAi/subStructureGeneration.py
@@ -24,6 +24,20 @@ class StructureGenerator:
self.services = services
self.aiService = aiService
+ def _getUserLanguage(self) -> str:
+ """Get user language for document generation"""
+ try:
+ if self.services:
+ # Prefer detected language if available (from user intention analysis)
+ if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage:
+ return self.services.currentUserLanguage
+ # Fallback to user's preferred language
+ elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'):
+ return self.services.user.language
+ except Exception:
+ pass
+ return 'en' # Default fallback
+
async def generateStructure(
self,
userPrompt: str,
@@ -168,9 +182,9 @@ class StructureGenerator:
if not contentPartsIndex:
contentPartsIndex = "\n(No content parts available)"
- # Extract language from user prompt or default to "de" (can be detected from userPrompt)
- # For now, default to "de" - can be enhanced with language detection later
- language = "en" # Default language
+ # Get language from services (user intention analysis)
+ language = self._getUserLanguage()
+ logger.debug(f"Using language from services (user intention analysis) for structure generation: {language}")
prompt = f"""USER REQUEST (for context):
```
@@ -228,7 +242,7 @@ RETURN JSON:
{{
"metadata": {{
"title": "Document Title",
- "language": "de"
+ "language": "{language}"
}},
"documents": [{{
"id": "doc_1",
From cb7ed7cf51986871208b48e5ea7ad4046868ded1 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 30 Dec 2025 10:48:16 +0100
Subject: [PATCH 4/5] web service parallelized
---
.../services/serviceAi/subAiCallLooping.py | 37 +-
.../services/serviceAi/subStructureFilling.py | 69 ++--
modules/services/serviceWeb/mainServiceWeb.py | 342 +++++++++++-------
3 files changed, 285 insertions(+), 163 deletions(-)
diff --git a/modules/services/serviceAi/subAiCallLooping.py b/modules/services/serviceAi/subAiCallLooping.py
index bb1824c2..6e2c90b5 100644
--- a/modules/services/serviceAi/subAiCallLooping.py
+++ b/modules/services/serviceAi/subAiCallLooping.py
@@ -122,10 +122,14 @@ class AiCallLooper:
)
# Write the ACTUAL prompt sent to AI
- if iteration == 1:
- self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
- else:
- self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
+ # For section content generation: only write one prompt file (first iteration)
+ # For document generation: write prompt for each iteration
+ isSectionContent = "_section_" in debugPrefix
+ if iteration == 1 or not isSectionContent:
+ if iteration == 1:
+ self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
+ elif not isSectionContent:
+ self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiService.callAi(request)
result = response.content
@@ -146,10 +150,13 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})")
# Write raw AI response to debug file
- if iteration == 1:
- self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
- else:
- self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
+ # For section content generation: only write one response file (first iteration)
+ # For document generation: write response for each iteration
+ if iteration == 1 or not isSectionContent:
+ if iteration == 1:
+ self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
+ elif not isSectionContent:
+ self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration (only if workflow exists and has id)
if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id:
@@ -219,9 +226,9 @@ class AiCallLooper:
logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
- # Write final result
+ # Note: Debug files (_prompt and _response) are already written above for iteration 1
+ # No need to write _final_result as it's redundant with _response
final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result)
- self.services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
return final_json
# Extract sections from response (handles both valid and broken JSON)
@@ -397,7 +404,10 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})")
# Log merged sections for debugging
- self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
+ # For section content generation: skip merged sections debug files (only one prompt/response needed)
+ isSectionContent = "_section_" in debugPrefix
+ if not isSectionContent:
+ self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# Check if we should continue (completion detection)
# Simple logic: JSON completeness determines continuation
@@ -465,7 +475,10 @@ class AiCallLooper:
final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata)
# Write final result to debug file
- self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
+ # For section content generation: skip final_result debug file (response already written)
+ isSectionContent = "_section_" in debugPrefix
+ if not isSectionContent:
+ self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
return final_result
diff --git a/modules/services/serviceAi/subStructureFilling.py b/modules/services/serviceAi/subStructureFilling.py
index 75642b48..138f6572 100644
--- a/modules/services/serviceAi/subStructureFilling.py
+++ b/modules/services/serviceAi/subStructureFilling.py
@@ -537,11 +537,6 @@ class StructureFiller:
try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt (aggregation)")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@@ -553,6 +548,12 @@ class StructureFiller:
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+ # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+
request = AiCallRequest(
prompt=generationPrompt,
contentParts=[],
@@ -564,6 +565,12 @@ class StructureFiller:
)
aiResponse = await self.aiService.callAi(request)
generatedElements = []
+
+ # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
else:
async def buildSectionPromptWithContinuation(
section: Dict[str, Any],
@@ -665,11 +672,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
- self.services.utils.writeDebugFile(
- aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response (aggregation)")
+ # Note: Debug files are written by _callAiWithLooping using debugPrefix
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
@@ -735,11 +738,6 @@ The JSON should be a fragment that can be merged with the previous response."""
try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@@ -751,6 +749,12 @@ The JSON should be a fragment that can be merged with the previous response."""
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+ # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+
request = AiCallRequest(
prompt=generationPrompt,
contentParts=[],
@@ -762,6 +766,12 @@ The JSON should be a fragment that can be merged with the previous response."""
)
aiResponse = await self.aiService.callAi(request)
generatedElements = []
+
+ # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
else:
isAggregation = False
@@ -865,11 +875,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
- self.services.utils.writeDebugFile(
- aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
+ # Note: Debug files are written by _callAiWithLooping using debugPrefix
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
@@ -968,11 +974,6 @@ The JSON should be a fragment that can be merged with the previous response."""
try:
self.services.chat.progressLogUpdate(sectionOperationId, 0.2, "Building generation prompt")
- self.services.utils.writeDebugFile(
- generationPrompt,
- f"{chapterId}_section_{sectionId}_prompt"
- )
- logger.debug(f"Logged section prompt: {chapterId}_section_{sectionId}_prompt")
self.services.chat.progressLogUpdate(sectionOperationId, 0.4, "Calling AI for content generation")
@@ -984,6 +985,12 @@ The JSON should be a fragment that can be merged with the previous response."""
logger.warning(f"Truncating DALL-E prompt from {len(generationPrompt)} to {maxPromptLength} characters")
generationPrompt = generationPrompt[:maxPromptLength].rsplit('\n', 1)[0]
+ # Write debug file for IMAGE_GENERATE (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ generationPrompt,
+ f"{chapterId}_section_{sectionId}_prompt"
+ )
+
request = AiCallRequest(
prompt=generationPrompt,
contentParts=[],
@@ -995,6 +1002,12 @@ The JSON should be a fragment that can be merged with the previous response."""
)
aiResponse = await self.aiService.callAi(request)
generatedElements = []
+
+ # Write debug file for IMAGE_GENERATE response (direct callAi, no _callAiWithLooping)
+ self.services.utils.writeDebugFile(
+ aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
+ f"{chapterId}_section_{sectionId}_response"
+ )
else:
isAggregation = False
@@ -1098,11 +1111,7 @@ The JSON should be a fragment that can be merged with the previous response."""
generatedElements = []
self.services.chat.progressLogUpdate(sectionOperationId, 0.6, "Processing AI response")
- self.services.utils.writeDebugFile(
- aiResponse.content if hasattr(aiResponse, 'content') else str(aiResponse),
- f"{chapterId}_section_{sectionId}_response"
- )
- logger.debug(f"Logged section response: {chapterId}_section_{sectionId}_response")
+ # Note: Debug files are written by _callAiWithLooping using debugPrefix
self.services.chat.progressLogUpdate(sectionOperationId, 0.8, "Validating generated content")
diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py
index 18176a92..50f7a84c 100644
--- a/modules/services/serviceWeb/mainServiceWeb.py
+++ b/modules/services/serviceWeb/mainServiceWeb.py
@@ -8,6 +8,8 @@ Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
import json
import logging
import time
+import asyncio
+from urllib.parse import urlparse
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
@@ -99,12 +101,18 @@ class WebService:
self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
- # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
- if len(allUrls) > maxNumberPages:
- allUrls = allUrls[:maxNumberPages]
+ # Step 3: Validate and filter URLs before crawling
+ validatedUrls = self._validateUrls(allUrls)
+ if not validatedUrls:
+ logger.warning(f"All {len(allUrls)} URLs failed validation")
+ return {"error": "No valid URLs found to crawl"}
+
+ # Filter to maxNumberPages (simple cut, no intelligent filtering)
+ if len(validatedUrls) > maxNumberPages:
+ validatedUrls = validatedUrls[:maxNumberPages]
logger.info(f"Limited URLs to {maxNumberPages}")
- if not allUrls:
+ if not validatedUrls:
return {"error": "No URLs found to crawl"}
# Step 4: Translate researchDepth to maxDepth
@@ -114,14 +122,14 @@ class WebService:
# Step 5: Crawl all URLs with hierarchical logging
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating")
- self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
+ self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs")
# Use parent operation ID directly (parentId should be operationId, not log entry ID)
parentOperationId = operationId # Use the parent's operationId directly
crawlResult = await self._performWebCrawl(
instruction=instruction,
- urls=allUrls,
+ urls=validatedUrls,
maxDepth=maxDepth,
parentOperationId=parentOperationId
)
@@ -194,24 +202,24 @@ class WebService:
"max_depth": maxDepth,
"country": countryCode,
"language": languageCode,
- "urls_crawled": allUrls[:20], # First 20 URLs for reference
- "total_urls": len(allUrls),
+ "urls_crawled": validatedUrls[:20], # First 20 URLs for reference
+ "total_urls": len(validatedUrls),
"urls_with_content": urlsWithContent,
"total_content_length": totalContentLength,
"crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None
},
"sections": sections,
"statistics": {
- "sectionCount": len(sections),
- "total_urls": len(allUrls),
+ "sectionCount": len(sections),
+ "total_urls": len(validatedUrls),
"results_count": totalResults,
"urls_with_content": urlsWithContent,
"total_content_length": totalContentLength
},
# Keep original structure for backward compatibility
"instruction": instruction,
- "urls_crawled": allUrls,
- "total_urls": len(allUrls),
+ "urls_crawled": validatedUrls,
+ "total_urls": len(validatedUrls),
"results": crawlResult,
"total_results": totalResults
}
@@ -383,6 +391,50 @@ Return ONLY valid JSON, no additional text:
logger.error(f"Error in web search: {str(e)}")
return []
+ def _validateUrls(self, urls: List[str]) -> List[str]:
+ """
+ Validate URLs before crawling - filters out invalid URLs.
+
+ Args:
+ urls: List of URLs to validate
+
+ Returns:
+ List of valid URLs
+ """
+ validatedUrls = []
+ for url in urls:
+ if not url or not isinstance(url, str):
+ logger.debug(f"Skipping invalid URL (not a string): {url}")
+ continue
+
+ url = url.strip()
+ if not url:
+ logger.debug(f"Skipping empty URL")
+ continue
+
+ # Basic URL validation using urlparse
+ try:
+ parsed = urlparse(url)
+ # Check if URL has at least scheme and netloc
+ if not parsed.scheme or not parsed.netloc:
+ logger.debug(f"Skipping invalid URL (missing scheme or netloc): {url}")
+ continue
+
+ # Only allow http/https schemes
+ if parsed.scheme not in ['http', 'https']:
+ logger.debug(f"Skipping URL with unsupported scheme '{parsed.scheme}': {url}")
+ continue
+
+ validatedUrls.append(url)
+ logger.debug(f"Validated URL: {url}")
+
+ except Exception as e:
+ logger.warning(f"Error validating URL '{url}': {str(e)}")
+ continue
+
+ logger.info(f"Validated {len(validatedUrls)}/{len(urls)} URLs")
+ return validatedUrls
+
async def _performWebCrawl(
self,
instruction: str,
@@ -390,117 +442,165 @@ Return ONLY valid JSON, no additional text:
maxDepth: int = 2,
parentOperationId: Optional[str] = None
) -> List[Dict[str, Any]]:
- """Perform web crawl on list of URLs - calls plugin for each URL individually."""
- crawlResults = []
-
- # Loop over each URL and crawl one at a time
+ """Perform web crawl on list of URLs - crawls URLs in parallel for better performance."""
+ # Create tasks for parallel crawling
+ crawlTasks = []
for urlIndex, url in enumerate(urls):
- # Create separate operation for each URL with parent reference
- urlOperationId = None
- if parentOperationId:
- workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
- urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
- self.services.chat.progressLogStart(
- urlOperationId,
- "Web Crawl",
- f"URL {urlIndex + 1}",
- url[:50] + "..." if len(url) > 50 else url,
- parentOperationId=parentOperationId
- )
-
- try:
- logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}")
-
- if urlOperationId:
- displayUrl = url[:50] + "..." if len(url) > 50 else url
- self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
- self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
-
- # Build crawl prompt model for single URL
- crawlPromptModel = AiCallPromptWebCrawl(
- instruction=instruction,
- url=url, # Single URL
- maxDepth=maxDepth,
- maxWidth=5 # Default: 5 pages per level
- )
- crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
-
- # Debug: persist crawl prompt (with URL identifier in content for clarity)
- debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
- self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
-
- # Call AI with WEB_CRAWL operation
- crawlOptions = AiCallOptions(
- operationType=OperationTypeEnum.WEB_CRAWL,
- resultFormat="json"
- )
-
- if urlOperationId:
- self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
-
- # Use unified callAiContent method with parentOperationId for hierarchical logging
- crawlResponse = await self.services.ai.callAiContent(
- prompt=crawlPrompt,
- options=crawlOptions,
- outputFormat="json",
- parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
- )
-
- if urlOperationId:
- self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
-
- # Extract content from AiResponse
- crawlResult = crawlResponse.content
-
- # Debug: persist crawl response
- if isinstance(crawlResult, str):
- self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
- else:
- self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
-
- # Parse crawl result
- if isinstance(crawlResult, str):
- try:
- # Extract JSON from response (handles markdown code blocks)
- extractedJson = self.services.utils.jsonExtractString(crawlResult)
- crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
- except:
- crawlData = {"url": url, "content": crawlResult}
- else:
- crawlData = crawlResult
-
- # Process crawl results and create hierarchical progress logging for sub-URLs
- if urlOperationId:
- self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
-
- # Recursively process crawl results to find nested URLs and create child operations
- processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
-
- # Count total URLs crawled (including sub-URLs) for progress message
- totalUrlsCrawled = self._countUrlsInResults(processedResults)
-
- # Ensure it's a list of results
- if isinstance(processedResults, list):
- crawlResults.extend(processedResults)
- elif isinstance(processedResults, dict):
- crawlResults.append(processedResults)
- else:
- crawlResults.append({"url": url, "content": str(processedResults)})
-
- if urlOperationId:
- if totalUrlsCrawled > 1:
- self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
- else:
- self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
- self.services.chat.progressLogFinish(urlOperationId, True)
-
- except Exception as e:
- logger.error(f"Error crawling URL {url}: {str(e)}")
- if urlOperationId:
- self.services.chat.progressLogFinish(urlOperationId, False)
- crawlResults.append({"url": url, "error": str(e)})
+ task = self._crawlSingleUrl(
+ url=url,
+ urlIndex=urlIndex,
+ totalUrls=len(urls),
+ instruction=instruction,
+ maxDepth=maxDepth,
+ parentOperationId=parentOperationId
+ )
+ crawlTasks.append(task)
- return crawlResults
+ # Execute all crawl tasks in parallel
+ logger.info(f"Starting parallel crawl of {len(urls)} URLs")
+ crawlResults = await asyncio.gather(*crawlTasks, return_exceptions=True)
+
+ # Process results and handle exceptions
+ processedResults = []
+ for idx, result in enumerate(crawlResults):
+ if isinstance(result, Exception):
+ logger.error(f"Error crawling URL {urls[idx]}: {str(result)}")
+ processedResults.append({"url": urls[idx], "error": str(result)})
+ else:
+ processedResults.extend(result if isinstance(result, list) else [result])
+
+ logger.info(f"Completed parallel crawl: {len(processedResults)} results")
+ return processedResults
+
+ async def _crawlSingleUrl(
+ self,
+ url: str,
+ urlIndex: int,
+ totalUrls: int,
+ instruction: str,
+ maxDepth: int,
+ parentOperationId: Optional[str] = None
+ ) -> List[Dict[str, Any]]:
+ """
+ Crawl a single URL - called in parallel for multiple URLs.
+
+ Args:
+ url: URL to crawl
+ urlIndex: Index of URL in the list
+ totalUrls: Total number of URLs being crawled
+ instruction: Research instruction
+ maxDepth: Maximum crawl depth
+ parentOperationId: Parent operation ID for progress tracking
+
+ Returns:
+ List of crawl results for this URL
+ """
+ # Create separate operation for each URL with parent reference
+ urlOperationId = None
+ if parentOperationId:
+ workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
+ urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
+ self.services.chat.progressLogStart(
+ urlOperationId,
+ "Web Crawl",
+ f"URL {urlIndex + 1}/{totalUrls}",
+ url[:50] + "..." if len(url) > 50 else url,
+ parentOperationId=parentOperationId
+ )
+
+ try:
+ logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls}: {url}")
+
+ if urlOperationId:
+ displayUrl = url[:50] + "..." if len(url) > 50 else url
+ self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
+ self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
+
+ # Build crawl prompt model for single URL
+ crawlPromptModel = AiCallPromptWebCrawl(
+ instruction=instruction,
+ url=url, # Single URL
+ maxDepth=maxDepth,
+ maxWidth=5 # Default: 5 pages per level
+ )
+ crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
+
+ # Debug: persist crawl prompt (with URL identifier in content for clarity)
+ debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
+ self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
+
+ # Call AI with WEB_CRAWL operation
+ crawlOptions = AiCallOptions(
+ operationType=OperationTypeEnum.WEB_CRAWL,
+ resultFormat="json"
+ )
+
+ if urlOperationId:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
+
+ # Use unified callAiContent method with parentOperationId for hierarchical logging
+ crawlResponse = await self.services.ai.callAiContent(
+ prompt=crawlPrompt,
+ options=crawlOptions,
+ outputFormat="json",
+ parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
+ )
+
+ if urlOperationId:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
+
+ # Extract content from AiResponse
+ crawlResult = crawlResponse.content
+
+ # Debug: persist crawl response
+ if isinstance(crawlResult, str):
+ self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
+ else:
+ self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
+
+ # Parse crawl result
+ if isinstance(crawlResult, str):
+ try:
+ # Extract JSON from response (handles markdown code blocks)
+ extractedJson = self.services.utils.jsonExtractString(crawlResult)
+ crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
+ except:
+ crawlData = {"url": url, "content": crawlResult}
+ else:
+ crawlData = crawlResult
+
+ # Process crawl results and create hierarchical progress logging for sub-URLs
+ if urlOperationId:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
+
+ # Recursively process crawl results to find nested URLs and create child operations
+ processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
+
+ # Count total URLs crawled (including sub-URLs) for progress message
+ totalUrlsCrawled = self._countUrlsInResults(processedResults)
+
+ # Ensure it's a list of results
+ if isinstance(processedResults, list):
+ results = processedResults
+ elif isinstance(processedResults, dict):
+ results = [processedResults]
+ else:
+ results = [{"url": url, "content": str(processedResults)}]
+
+ if urlOperationId:
+ if totalUrlsCrawled > 1:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
+ else:
+ self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
+ self.services.chat.progressLogFinish(urlOperationId, True)
+
+ return results
+
+ except Exception as e:
+ logger.error(f"Error crawling URL {url}: {str(e)}")
+ if urlOperationId:
+ self.services.chat.progressLogFinish(urlOperationId, False)
+ return [{"url": url, "error": str(e)}]
def _processCrawlResultsWithHierarchy(
self,
From 0d55e401583f5c1f1e8d8ab22cde0499b1b3a082 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 30 Dec 2025 11:11:31 +0100
Subject: [PATCH 5/5] fixed dashboard content
---
.../serviceGeneration/renderers/rendererHtml.py | 9 +++++++--
.../renderers/rendererMarkdown.py | 15 +++++++++++++--
modules/shared/progressLogger.py | 2 +-
3 files changed, 21 insertions(+), 5 deletions(-)
diff --git a/modules/services/serviceGeneration/renderers/rendererHtml.py b/modules/services/serviceGeneration/renderers/rendererHtml.py
index 47fecffa..1f013e50 100644
--- a/modules/services/serviceGeneration/renderers/rendererHtml.py
+++ b/modules/services/serviceGeneration/renderers/rendererHtml.py
@@ -417,7 +417,10 @@ class RendererHtml(BaseRenderer):
if htmlParts:
return '\n'.join(htmlParts)
- return self._renderJsonParagraph(sectionData, styles)
+ # If sectionData is not a list, treat it as a dict
+ if isinstance(sectionData, dict):
+ return self._renderJsonParagraph(sectionData, styles)
+ return ""
elif sectionType == "code_block":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
@@ -451,7 +454,9 @@ class RendererHtml(BaseRenderer):
if htmlParts:
return '\n'.join(htmlParts)
# Fallback to paragraph for unknown types
- return self._renderJsonParagraph(sectionData, styles)
+ if isinstance(sectionData, dict):
+ return self._renderJsonParagraph(sectionData, styles)
+ return ""
except Exception as e:
self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}")
diff --git a/modules/services/serviceGeneration/renderers/rendererMarkdown.py b/modules/services/serviceGeneration/renderers/rendererMarkdown.py
index 4b372bb2..84644485 100644
--- a/modules/services/serviceGeneration/renderers/rendererMarkdown.py
+++ b/modules/services/serviceGeneration/renderers/rendererMarkdown.py
@@ -162,7 +162,13 @@ class RendererMarkdown(BaseRenderer):
return self._renderJsonHeading(element)
return ""
elif sectionType == "paragraph":
- return self._renderJsonParagraph(sectionData)
+ # Work directly with elements like other renderers
+ if isinstance(sectionData, list) and sectionData:
+ element = sectionData[0] if isinstance(sectionData[0], dict) else {}
+ return self._renderJsonParagraph(element)
+ elif isinstance(sectionData, dict):
+ return self._renderJsonParagraph(sectionData)
+ return ""
elif sectionType == "code_block":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
@@ -177,7 +183,12 @@ class RendererMarkdown(BaseRenderer):
return ""
else:
# Fallback to paragraph for unknown types
- return self._renderJsonParagraph(sectionData)
+ if isinstance(sectionData, list) and sectionData:
+ element = sectionData[0] if isinstance(sectionData[0], dict) else {}
+ return self._renderJsonParagraph(element)
+ elif isinstance(sectionData, dict):
+ return self._renderJsonParagraph(sectionData)
+ return ""
except Exception as e:
self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}")
diff --git a/modules/shared/progressLogger.py b/modules/shared/progressLogger.py
index 8c6e56f8..bd8b9779 100644
--- a/modules/shared/progressLogger.py
+++ b/modules/shared/progressLogger.py
@@ -132,7 +132,7 @@ class ProgressLogger:
return None
op = self.activeOperations[operationId]
- message = f"Service {op['service']}"
+ message = f"{op['service']}"
workflow = self.services.workflow
if not workflow: