parallel processing for rendering

This commit is contained in:
ValueOn AG 2025-12-30 02:06:51 +01:00
parent a958defd42
commit fa57d3683b
6 changed files with 1542 additions and 952 deletions

View file

@ -0,0 +1,376 @@
# Parallel Processing Refactoring Concept
## Current State (Sequential)
### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`)
- **Current**: Processes chapters sequentially, one after another
- **Flow**:
1. Iterate through documents
2. For each document, iterate through chapters
3. For each chapter, generate sections structure using AI
4. Update progress after each chapter
### Section Content Generation (`_fillChapterSections`)
- **Current**: Processes chapters sequentially, sections within each chapter sequentially
- **Flow**:
1. Iterate through documents
2. For each document, iterate through chapters
3. For each chapter, iterate through sections
4. For each section, generate content using AI
5. Update progress after each section
## Desired State (Parallel)
### Chapter Sections Structure Generation
- **Target**: Process all chapters in parallel
- **Requirements**:
- Maintain chapter order in final result
- Each chapter can be processed independently
- Progress updates should reflect parallel processing
- Errors in one chapter should not stop others
### Section Content Generation
- **Target**: Process sections within each chapter in parallel
- **Requirements**:
- Maintain section order within each chapter
- Sections within a chapter can be processed independently
- Chapters still processed sequentially (to maintain order)
- Progress updates should reflect parallel processing
- Errors in one section should not stop others
## Implementation Strategy
### Phase 1: Chapter Sections Structure Generation Parallelization
#### Step 1.1: Extract Single Chapter Processing
- **Create**: `_generateSingleChapterSectionsStructure()` method
- **Purpose**: Process one chapter independently
- **Parameters**:
- `chapter`: Chapter dict
- `chapterIndex`: Index for ordering
- `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata
- `generationHint`: Generation instructions
- `contentPartIds`, `contentPartInstructions`: Content part info
- `contentParts`: Full content parts list
- `userPrompt`: User's original prompt
- `language`: Language for generation
- `parentOperationId`: For progress logging
- **Returns**: None (modifies chapter dict in place)
- **Error Handling**: Logs errors, raises exception to be caught by caller
#### Step 1.2: Refactor Main Method
- **Modify**: `_generateChapterSectionsStructure()`
- **Changes**:
1. Collect all chapters with their indices
2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure`
3. Use `asyncio.gather()` to execute all tasks in parallel
4. Process results in order (using `zip` with original order)
5. Handle errors per chapter (don't fail entire operation)
6. Update progress after each chapter completes
#### Step 1.3: Progress Reporting
- **Maintain**: Overall progress tracking
- **Update**: Progress after each chapter completes (not sequentially)
- **Format**: "Chapter X/Y completed" or "Chapter X/Y error"
### Phase 2: Section Content Generation Parallelization
#### Step 2.1: Extract Single Section Processing
- **Create**: `_processSingleSection()` method
- **Purpose**: Process one section independently
- **Parameters**:
- `section`: Section dict
- `sectionIndex`: Index for ordering
- `totalSections`: Total sections in chapter
- `chapterIndex`: Chapter index
- `totalChapters`: Total chapters
- `chapterId`: Chapter ID
- `chapterOperationId`: Chapter progress operation ID
- `fillOperationId`: Overall fill operation ID
- `contentParts`: Full content parts list
- `userPrompt`: User's original prompt
- `all_sections_list`: All sections for context
- `language`: Language for generation
- `calculateOverallProgress`: Function to calculate overall progress
- **Returns**: `List[Dict[str, Any]]` (elements for the section)
- **Error Handling**: Returns error element instead of raising
#### Step 2.2: Extract Section Processing Logic
- **Create**: Helper methods for different processing paths:
- `_processSectionAggregation()`: Handle aggregation path (multiple parts)
- `_processSectionGeneration()`: Handle generation without parts (only generationHint)
- `_processSectionParts()`: Handle individual part processing
- **Purpose**: Keep logic organized and reusable
#### Step 2.3: Refactor Main Method
- **Modify**: `_fillChapterSections()`
- **Changes**:
1. Keep sequential chapter processing (maintains order)
2. For each chapter, collect all sections with indices
3. Create async tasks for each section using `_processSingleSection`
4. Use `asyncio.gather()` to execute all section tasks in parallel
5. Process results in order (using `zip` with original order)
6. Assign elements to sections in correct order
7. Update progress after each section completes
8. Handle errors per section (don't fail entire chapter)
#### Step 2.4: Progress Reporting
- **Maintain**: Hierarchical progress tracking
- **Update**:
- Section progress: After each section completes
- Chapter progress: After all sections in chapter complete
- Overall progress: After each section/chapter completes
- **Format**: "Chapter X/Y, Section A/B completed"
## Key Considerations
### Order Preservation
- **Chapters**: Must maintain document order → process chapters sequentially
- **Sections**: Must maintain chapter order → process sections sequentially within chapter
- **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order
### Error Handling
- **Chapters**: Error in one chapter should not stop others
- **Sections**: Error in one section should not stop others
- **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)`
### Progress Reporting
- **Challenge**: Progress updates happen out of order
- **Solution**: Update progress when each task completes, not sequentially
- **Format**: Show completed count, not sequential position
### Shared State
- **Chapters**: Modify chapter dicts in place (safe, each chapter is independent)
- **Sections**: Return elements, assign to sections in order (safe, each section is independent)
- **Content Parts**: Read-only, passed to all tasks (safe)
### Dependencies
- **Chapters**: No dependencies between chapters
- **Sections**: No dependencies between sections (each is self-contained)
- **Solution**: All tasks can run truly in parallel
## Implementation Steps
### Step 1: Clean Current Code
1. Ensure current sequential implementation is correct
2. Fix any existing bugs
3. Verify all tests pass
### Step 2: Implement Chapter Parallelization
1. Create `_generateSingleChapterSectionsStructure()` method
2. Extract chapter processing logic
3. Refactor `_generateChapterSectionsStructure()` to use parallel processing
4. Test with single chapter
5. Test with multiple chapters
6. Verify order preservation
7. Verify error handling
### Step 3: Implement Section Parallelization
1. Create `_processSingleSection()` method
2. Extract section processing logic into helper methods
3. Refactor `_fillChapterSections()` to use parallel processing for sections
4. Test with single section
5. Test with multiple sections
6. Test with multiple chapters
7. Verify order preservation
8. Verify error handling
### Step 4: Testing & Validation
1. Test with various document structures
2. Test error scenarios
3. Verify progress reporting accuracy
4. Performance testing (compare sequential vs parallel)
5. Verify final output order matches input order
## Code Structure
### New Methods to Create
```python
async def _generateSingleChapterSectionsStructure(
self,
chapter: Dict[str, Any],
chapterIndex: int,
chapterId: str,
chapterLevel: int,
chapterTitle: str,
generationHint: str,
contentPartIds: List[str],
contentPartInstructions: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
language: str,
parentOperationId: str
) -> None:
"""Generate sections structure for a single chapter (used for parallel processing)."""
# Extract logic from current sequential loop
# Modify chapter dict in place
# Handle errors internally, raise if critical
async def _processSingleSection(
self,
section: Dict[str, Any],
sectionIndex: int,
totalSections: int,
chapterIndex: int,
totalChapters: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentParts: List[ContentPart],
userPrompt: str,
all_sections_list: List[Dict[str, Any]],
language: str,
calculateOverallProgress: Callable
) -> List[Dict[str, Any]]:
"""Process a single section and return its elements."""
# Extract logic from current sequential loop
# Return elements list
# Return error element on failure (don't raise)
async def _processSectionAggregation(
self,
section: Dict[str, Any],
sectionId: str,
sectionTitle: str,
sectionIndex: int,
totalSections: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentPartIds: List[str],
contentFormats: Dict[str, str],
contentParts: List[ContentPart],
userPrompt: str,
generationHint: str,
all_sections_list: List[Dict[str, Any]],
language: str
) -> List[Dict[str, Any]]:
"""Process section with aggregation (multiple parts together)."""
# Extract aggregation logic
# Return elements list
async def _processSectionGeneration(
self,
section: Dict[str, Any],
sectionId: str,
sectionTitle: str,
sectionIndex: int,
totalSections: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentType: str,
userPrompt: str,
generationHint: str,
all_sections_list: List[Dict[str, Any]],
language: str
) -> List[Dict[str, Any]]:
"""Process section generation without content parts (only generationHint)."""
# Extract generation logic
# Return elements list
async def _processSectionParts(
self,
section: Dict[str, Any],
sectionId: str,
sectionTitle: str,
sectionIndex: int,
totalSections: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentPartIds: List[str],
contentFormats: Dict[str, str],
contentParts: List[ContentPart],
contentType: str,
useAiCall: bool,
generationHint: str,
userPrompt: str,
all_sections_list: List[Dict[str, Any]],
language: str
) -> List[Dict[str, Any]]:
"""Process individual parts in a section."""
# Extract individual part processing logic
# Return elements list
```
### Modified Methods
```python
async def _generateChapterSectionsStructure(
self,
chapterStructure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
parentOperationId: str
) -> Dict[str, Any]:
"""Generate sections structure for all chapters in parallel."""
# Collect chapters with indices
# Create tasks
# Execute in parallel
# Process results in order
# Update progress
async def _fillChapterSections(
self,
chapterStructure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
fillOperationId: str
) -> Dict[str, Any]:
"""Fill sections with content, processing sections in parallel within each chapter."""
# Process chapters sequentially
# For each chapter, process sections in parallel
# Maintain order
# Update progress
```
## Testing Strategy
### Unit Tests
1. Test `_generateSingleChapterSectionsStructure` independently
2. Test `_processSingleSection` independently
3. Test helper methods independently
### Integration Tests
1. Test parallel chapter processing with multiple chapters
2. Test parallel section processing with multiple sections
3. Test error handling (one chapter/section fails)
4. Test order preservation
### Performance Tests
1. Measure sequential vs parallel execution time
2. Verify parallel processing is faster
3. Check resource usage (memory, CPU)
## Risk Mitigation
### Risks
1. **Order not preserved**: Use `zip` with original order
2. **Race conditions**: No shared mutable state between tasks
3. **Progress reporting incorrect**: Update progress when tasks complete
4. **Errors not handled**: Use `return_exceptions=True` and check results
5. **Performance degradation**: Test and measure, fallback to sequential if needed
### Safety Measures
1. Keep sequential implementation as fallback (commented out)
2. Add feature flag to enable/disable parallel processing
3. Extensive logging for debugging
4. Gradual rollout (test with small datasets first)
## Migration Path
1. **Phase 1**: Implement chapter parallelization, test thoroughly
2. **Phase 2**: Implement section parallelization, test thoroughly
3. **Phase 3**: Enable both in production with monitoring
4. **Phase 4**: Remove sequential fallback code (if stable)
## Notes
- All async methods must use `await` correctly
- Progress updates happen asynchronously (may appear out of order in logs)
- Final result order is guaranteed by processing results in order
- Error handling is per-task, not global
- No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts)

View file

@ -1,126 +0,0 @@
# Refactoring Plan für mainServiceAi.py
## Ziel
Aufteilen des 3000-Zeilen-Moduls in überschaubare Submodule (~300-600 Zeilen pro Modul).
## Vorgeschlagene Struktur
### Bereits erstellt:
1. ✅ `subResponseParsing.py` - ResponseParser Klasse
2. ✅ `subDocumentIntents.py` - DocumentIntentAnalyzer Klasse
### Noch zu erstellen:
3. `subContentExtraction.py` - ContentExtractor Klasse
- `extractAndPrepareContent()` (~490 Zeilen)
- `extractTextFromImage()` (~55 Zeilen)
- `processTextContentWithAi()` (~72 Zeilen)
- `_isBinary()` (~10 Zeilen)
4. `subStructureGeneration.py` - StructureGenerator Klasse
- `generateStructure()` (~60 Zeilen)
- `_buildStructurePrompt()` (~130 Zeilen)
5. `subStructureFilling.py` - StructureFiller Klasse
- `fillStructure()` (~290 Zeilen)
- `_buildSectionGenerationPrompt()` (~185 Zeilen)
- `_findContentPartById()` (~5 Zeilen)
- `_needsAggregation()` (~20 Zeilen)
6. `subAiCallLooping.py` - AiCallLooper Klasse
- `callAiWithLooping()` (~405 Zeilen)
- `_defineKpisFromPrompt()` (~92 Zeilen)
## Refactoring-Schritte für mainServiceAi.py
### Schritt 1: Submodule-Initialisierung erweitern
```python
def _initializeSubmodules(self):
"""Initialize all submodules after aiObjects is ready."""
if self.aiObjects is None:
raise RuntimeError("aiObjects must be initialized before initializing submodules")
if self.extractionService is None:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
# Neue Submodule initialisieren
from modules.services.serviceAi.subResponseParsing import ResponseParser
from modules.services.serviceAi.subDocumentIntents import DocumentIntentAnalyzer
from modules.services.serviceAi.subContentExtraction import ContentExtractor
from modules.services.serviceAi.subStructureGeneration import StructureGenerator
from modules.services.serviceAi.subStructureFilling import StructureFiller
if not hasattr(self, 'responseParser'):
self.responseParser = ResponseParser(self.services)
if not hasattr(self, 'intentAnalyzer'):
self.intentAnalyzer = DocumentIntentAnalyzer(self.services, self)
if not hasattr(self, 'contentExtractor'):
self.contentExtractor = ContentExtractor(self.services, self)
if not hasattr(self, 'structureGenerator'):
self.structureGenerator = StructureGenerator(self.services, self)
if not hasattr(self, 'structureFiller'):
self.structureFiller = StructureFiller(self.services, self)
```
### Schritt 2: Methoden durch Delegation ersetzen
**Beispiel für Response Parsing:**
```python
# ALT:
def _extractSectionsFromResponse(self, ...):
# 100 Zeilen Code
...
# NEU:
def _extractSectionsFromResponse(self, ...):
return self.responseParser.extractSectionsFromResponse(...)
```
**Beispiel für Document Intents:**
```python
# ALT:
async def _clarifyDocumentIntents(self, ...):
# 100 Zeilen Code
...
# NEU:
async def _clarifyDocumentIntents(self, ...):
return await self.intentAnalyzer.clarifyDocumentIntents(...)
```
### Schritt 3: Helper-Methoden beibehalten
Kleine Helper-Methoden bleiben im Hauptmodul:
- `_buildPromptWithPlaceholders()`
- `_getIntentForDocument()`
- `_shouldSkipContentPart()`
- `_determineDocumentName()`
### Schritt 4: Public API unverändert lassen
Die öffentliche API (`callAiPlanning`, `callAiContent`) bleibt unverändert.
## Erwartete Ergebnis-Größen
- `mainServiceAi.py`: ~800-1000 Zeilen (von 3016)
- `subResponseParsing.py`: ~200 Zeilen ✅
- `subDocumentIntents.py`: ~300 Zeilen ✅
- `subContentExtraction.py`: ~600 Zeilen
- `subStructureGeneration.py`: ~200 Zeilen
- `subStructureFilling.py`: ~400 Zeilen
- `subAiCallLooping.py`: ~500 Zeilen
**Gesamt: ~3000 Zeilen** (gleich, aber besser organisiert)
## Vorteile
1. **Übersichtlichkeit**: Jedes Modul hat eine klare Verantwortlichkeit
2. **Wartbarkeit**: Änderungen sind lokalisiert
3. **Testbarkeit**: Module können einzeln getestet werden
4. **Wiederverwendbarkeit**: Module können in anderen Kontexten verwendet werden

View file

@ -676,6 +676,7 @@ Respond with ONLY a JSON object in this exact format:
) )
# Schritt 5D: Fülle Struktur # Schritt 5D: Fülle Struktur
# Language will be extracted from services (user intention analysis) in fillStructure
filledStructure = await self._fillStructure( filledStructure = await self._fillStructure(
structure, structure,
contentParts or [], contentParts or [],

View file

@ -14,7 +14,7 @@ from typing import Dict, Any, List, Optional, Callable
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState
from modules.datamodels.datamodelExtraction import ContentPart from modules.datamodels.datamodelExtraction import ContentPart
from modules.shared.jsonUtils import buildContinuationContext, extractJsonString from modules.shared.jsonUtils import buildContinuationContext, extractJsonString, tryParseJson
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -192,6 +192,38 @@ class AiCallLooper:
# Store raw response for continuation (even if broken) # Store raw response for continuation (even if broken)
lastRawResponse = result lastRawResponse = result
# Check if this is section content generation (has "elements" not "sections")
# Section content generation returns JSON with "elements" array, not document structure with "sections"
isSectionContentGeneration = False
parsedJsonForSection = None
extractedJsonForSection = None
try:
extractedJsonForSection = extractJsonString(result)
parsedJson, parseError, _ = tryParseJson(extractedJsonForSection)
if parseError is None and parsedJson:
parsedJsonForSection = parsedJson
# Check if JSON has "elements" (section content) or "sections" (document structure)
if isinstance(parsedJson, dict):
if "elements" in parsedJson:
isSectionContentGeneration = True
elif isinstance(parsedJson, list) and len(parsedJson) > 0:
# Check if it's a list of elements (section content format)
if isinstance(parsedJson[0], dict) and "type" in parsedJson[0]:
isSectionContentGeneration = True
except Exception:
pass
if isSectionContentGeneration:
# This is section content generation - return the JSON directly
# No need to extract sections, just return the complete JSON string
logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
# Write final result
final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result)
self.services.utils.writeDebugFile(final_json, f"{debugPrefix}_final_result")
return final_json
# Extract sections from response (handles both valid and broken JSON) # Extract sections from response (handles both valid and broken JSON)
# Only for document generation (JSON responses) # Only for document generation (JSON responses)
# CRITICAL: Pass allSections and accumulationState to enable string accumulation # CRITICAL: Pass allSections and accumulationState to enable string accumulation

File diff suppressed because it is too large Load diff

View file

@ -76,7 +76,30 @@ class StructureGenerator:
) )
# Parse Struktur # Parse Struktur
structure = json.loads(self.services.utils.jsonExtractString(aiResponse)) # Use tryParseJson which handles malformed JSON and unterminated strings
extractedJson = self.services.utils.jsonExtractString(aiResponse)
parsedJson, parseError, cleanedJson = self.services.utils.jsonTryParse(extractedJson)
if parseError is not None:
# Try to repair broken JSON (handles unterminated strings, incomplete structures, etc.)
logger.warning(f"Initial JSON parsing failed: {str(parseError)}. Attempting repair...")
from modules.shared import jsonUtils
repairedJson = jsonUtils.repairBrokenJson(extractedJson)
if repairedJson:
# Try parsing repaired JSON
parsedJson, parseError, _ = self.services.utils.jsonTryParse(json.dumps(repairedJson))
if parseError is None:
logger.info("Successfully repaired and parsed JSON structure")
structure = parsedJson
else:
logger.error(f"Failed to parse repaired JSON: {str(parseError)}")
raise ValueError(f"Failed to parse JSON structure after repair: {str(parseError)}")
else:
logger.error(f"Failed to repair JSON. Parse error: {str(parseError)}")
logger.error(f"Cleaned JSON preview (first 500 chars): {cleanedJson[:500]}")
raise ValueError(f"Failed to parse JSON structure: {str(parseError)}")
else:
structure = parsedJson
# ChatLog abschließen # ChatLog abschließen
self.services.chat.progressLogFinish(structureOperationId, True) self.services.chat.progressLogFinish(structureOperationId, True)
@ -145,11 +168,17 @@ class StructureGenerator:
if not contentPartsIndex: if not contentPartsIndex:
contentPartsIndex = "\n(No content parts available)" contentPartsIndex = "\n(No content parts available)"
# Extract language from user prompt or default to "de" (can be detected from userPrompt)
# For now, default to "de" - can be enhanced with language detection later
language = "en" # Default language
prompt = f"""USER REQUEST (for context): prompt = f"""USER REQUEST (for context):
``` ```
{userPrompt} {userPrompt}
``` ```
LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}.
AVAILABLE CONTENT PARTS: AVAILABLE CONTENT PARTS:
{contentPartsIndex} {contentPartsIndex}