# Parallel Processing Refactoring Concept ## Current State (Sequential) ### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`) - **Current**: Processes chapters sequentially, one after another - **Flow**: 1. Iterate through documents 2. For each document, iterate through chapters 3. For each chapter, generate sections structure using AI 4. Update progress after each chapter ### Section Content Generation (`_fillChapterSections`) - **Current**: Processes chapters sequentially, sections within each chapter sequentially - **Flow**: 1. Iterate through documents 2. For each document, iterate through chapters 3. For each chapter, iterate through sections 4. For each section, generate content using AI 5. Update progress after each section ## Desired State (Parallel) ### Chapter Sections Structure Generation - **Target**: Process all chapters in parallel - **Requirements**: - Maintain chapter order in final result - Each chapter can be processed independently - Progress updates should reflect parallel processing - Errors in one chapter should not stop others ### Section Content Generation - **Target**: Process sections within each chapter in parallel - **Requirements**: - Maintain section order within each chapter - Sections within a chapter can be processed independently - Chapters still processed sequentially (to maintain order) - Progress updates should reflect parallel processing - Errors in one section should not stop others ## Implementation Strategy ### Phase 1: Chapter Sections Structure Generation Parallelization #### Step 1.1: Extract Single Chapter Processing - **Create**: `_generateSingleChapterSectionsStructure()` method - **Purpose**: Process one chapter independently - **Parameters**: - `chapter`: Chapter dict - `chapterIndex`: Index for ordering - `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata - `generationHint`: Generation instructions - `contentPartIds`, `contentPartInstructions`: Content part info - `contentParts`: Full content parts list - `userPrompt`: User's original prompt - `language`: Language for generation - `parentOperationId`: For progress logging - **Returns**: None (modifies chapter dict in place) - **Error Handling**: Logs errors, raises exception to be caught by caller #### Step 1.2: Refactor Main Method - **Modify**: `_generateChapterSectionsStructure()` - **Changes**: 1. Collect all chapters with their indices 2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure` 3. Use `asyncio.gather()` to execute all tasks in parallel 4. Process results in order (using `zip` with original order) 5. Handle errors per chapter (don't fail entire operation) 6. Update progress after each chapter completes #### Step 1.3: Progress Reporting - **Maintain**: Overall progress tracking - **Update**: Progress after each chapter completes (not sequentially) - **Format**: "Chapter X/Y completed" or "Chapter X/Y error" ### Phase 2: Section Content Generation Parallelization #### Step 2.1: Extract Single Section Processing - **Create**: `_processSingleSection()` method - **Purpose**: Process one section independently - **Parameters**: - `section`: Section dict - `sectionIndex`: Index for ordering - `totalSections`: Total sections in chapter - `chapterIndex`: Chapter index - `totalChapters`: Total chapters - `chapterId`: Chapter ID - `chapterOperationId`: Chapter progress operation ID - `fillOperationId`: Overall fill operation ID - `contentParts`: Full content parts list - `userPrompt`: User's original prompt - `all_sections_list`: All sections for context - `language`: Language for generation - `calculateOverallProgress`: Function to calculate overall progress - **Returns**: `List[Dict[str, Any]]` (elements for the section) - **Error Handling**: Returns error element instead of raising #### Step 2.2: Extract Section Processing Logic - **Create**: Helper methods for different processing paths: - `_processSectionAggregation()`: Handle aggregation path (multiple parts) - `_processSectionGeneration()`: Handle generation without parts (only generationHint) - `_processSectionParts()`: Handle individual part processing - **Purpose**: Keep logic organized and reusable #### Step 2.3: Refactor Main Method - **Modify**: `_fillChapterSections()` - **Changes**: 1. Keep sequential chapter processing (maintains order) 2. For each chapter, collect all sections with indices 3. Create async tasks for each section using `_processSingleSection` 4. Use `asyncio.gather()` to execute all section tasks in parallel 5. Process results in order (using `zip` with original order) 6. Assign elements to sections in correct order 7. Update progress after each section completes 8. Handle errors per section (don't fail entire chapter) #### Step 2.4: Progress Reporting - **Maintain**: Hierarchical progress tracking - **Update**: - Section progress: After each section completes - Chapter progress: After all sections in chapter complete - Overall progress: After each section/chapter completes - **Format**: "Chapter X/Y, Section A/B completed" ## Key Considerations ### Order Preservation - **Chapters**: Must maintain document order → process chapters sequentially - **Sections**: Must maintain chapter order → process sections sequentially within chapter - **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order ### Error Handling - **Chapters**: Error in one chapter should not stop others - **Sections**: Error in one section should not stop others - **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)` ### Progress Reporting - **Challenge**: Progress updates happen out of order - **Solution**: Update progress when each task completes, not sequentially - **Format**: Show completed count, not sequential position ### Shared State - **Chapters**: Modify chapter dicts in place (safe, each chapter is independent) - **Sections**: Return elements, assign to sections in order (safe, each section is independent) - **Content Parts**: Read-only, passed to all tasks (safe) ### Dependencies - **Chapters**: No dependencies between chapters - **Sections**: No dependencies between sections (each is self-contained) - **Solution**: All tasks can run truly in parallel ## Implementation Steps ### Step 1: Clean Current Code 1. Ensure current sequential implementation is correct 2. Fix any existing bugs 3. Verify all tests pass ### Step 2: Implement Chapter Parallelization 1. Create `_generateSingleChapterSectionsStructure()` method 2. Extract chapter processing logic 3. Refactor `_generateChapterSectionsStructure()` to use parallel processing 4. Test with single chapter 5. Test with multiple chapters 6. Verify order preservation 7. Verify error handling ### Step 3: Implement Section Parallelization 1. Create `_processSingleSection()` method 2. Extract section processing logic into helper methods 3. Refactor `_fillChapterSections()` to use parallel processing for sections 4. Test with single section 5. Test with multiple sections 6. Test with multiple chapters 7. Verify order preservation 8. Verify error handling ### Step 4: Testing & Validation 1. Test with various document structures 2. Test error scenarios 3. Verify progress reporting accuracy 4. Performance testing (compare sequential vs parallel) 5. Verify final output order matches input order ## Code Structure ### New Methods to Create ```python async def _generateSingleChapterSectionsStructure( self, chapter: Dict[str, Any], chapterIndex: int, chapterId: str, chapterLevel: int, chapterTitle: str, generationHint: str, contentPartIds: List[str], contentPartInstructions: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, language: str, parentOperationId: str ) -> None: """Generate sections structure for a single chapter (used for parallel processing).""" # Extract logic from current sequential loop # Modify chapter dict in place # Handle errors internally, raise if critical async def _processSingleSection( self, section: Dict[str, Any], sectionIndex: int, totalSections: int, chapterIndex: int, totalChapters: int, chapterId: str, chapterOperationId: str, fillOperationId: str, contentParts: List[ContentPart], userPrompt: str, all_sections_list: List[Dict[str, Any]], language: str, calculateOverallProgress: Callable ) -> List[Dict[str, Any]]: """Process a single section and return its elements.""" # Extract logic from current sequential loop # Return elements list # Return error element on failure (don't raise) async def _processSectionAggregation( self, section: Dict[str, Any], sectionId: str, sectionTitle: str, sectionIndex: int, totalSections: int, chapterId: str, chapterOperationId: str, fillOperationId: str, contentPartIds: List[str], contentFormats: Dict[str, str], contentParts: List[ContentPart], userPrompt: str, generationHint: str, all_sections_list: List[Dict[str, Any]], language: str ) -> List[Dict[str, Any]]: """Process section with aggregation (multiple parts together).""" # Extract aggregation logic # Return elements list async def _processSectionGeneration( self, section: Dict[str, Any], sectionId: str, sectionTitle: str, sectionIndex: int, totalSections: int, chapterId: str, chapterOperationId: str, fillOperationId: str, contentType: str, userPrompt: str, generationHint: str, all_sections_list: List[Dict[str, Any]], language: str ) -> List[Dict[str, Any]]: """Process section generation without content parts (only generationHint).""" # Extract generation logic # Return elements list async def _processSectionParts( self, section: Dict[str, Any], sectionId: str, sectionTitle: str, sectionIndex: int, totalSections: int, chapterId: str, chapterOperationId: str, fillOperationId: str, contentPartIds: List[str], contentFormats: Dict[str, str], contentParts: List[ContentPart], contentType: str, useAiCall: bool, generationHint: str, userPrompt: str, all_sections_list: List[Dict[str, Any]], language: str ) -> List[Dict[str, Any]]: """Process individual parts in a section.""" # Extract individual part processing logic # Return elements list ``` ### Modified Methods ```python async def _generateChapterSectionsStructure( self, chapterStructure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, parentOperationId: str ) -> Dict[str, Any]: """Generate sections structure for all chapters in parallel.""" # Collect chapters with indices # Create tasks # Execute in parallel # Process results in order # Update progress async def _fillChapterSections( self, chapterStructure: Dict[str, Any], contentParts: List[ContentPart], userPrompt: str, fillOperationId: str ) -> Dict[str, Any]: """Fill sections with content, processing sections in parallel within each chapter.""" # Process chapters sequentially # For each chapter, process sections in parallel # Maintain order # Update progress ``` ## Testing Strategy ### Unit Tests 1. Test `_generateSingleChapterSectionsStructure` independently 2. Test `_processSingleSection` independently 3. Test helper methods independently ### Integration Tests 1. Test parallel chapter processing with multiple chapters 2. Test parallel section processing with multiple sections 3. Test error handling (one chapter/section fails) 4. Test order preservation ### Performance Tests 1. Measure sequential vs parallel execution time 2. Verify parallel processing is faster 3. Check resource usage (memory, CPU) ## Risk Mitigation ### Risks 1. **Order not preserved**: Use `zip` with original order 2. **Race conditions**: No shared mutable state between tasks 3. **Progress reporting incorrect**: Update progress when tasks complete 4. **Errors not handled**: Use `return_exceptions=True` and check results 5. **Performance degradation**: Test and measure, fallback to sequential if needed ### Safety Measures 1. Keep sequential implementation as fallback (commented out) 2. Add feature flag to enable/disable parallel processing 3. Extensive logging for debugging 4. Gradual rollout (test with small datasets first) ## Migration Path 1. **Phase 1**: Implement chapter parallelization, test thoroughly 2. **Phase 2**: Implement section parallelization, test thoroughly 3. **Phase 3**: Enable both in production with monitoring 4. **Phase 4**: Remove sequential fallback code (if stable) ## Notes - All async methods must use `await` correctly - Progress updates happen asynchronously (may appear out of order in logs) - Final result order is guaranteed by processing results in order - Error handling is per-task, not global - No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts)