376 lines
13 KiB
Markdown
376 lines
13 KiB
Markdown
# Parallel Processing Refactoring Concept
|
|
|
|
## Current State (Sequential)
|
|
|
|
### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`)
|
|
- **Current**: Processes chapters sequentially, one after another
|
|
- **Flow**:
|
|
1. Iterate through documents
|
|
2. For each document, iterate through chapters
|
|
3. For each chapter, generate sections structure using AI
|
|
4. Update progress after each chapter
|
|
|
|
### Section Content Generation (`_fillChapterSections`)
|
|
- **Current**: Processes chapters sequentially, sections within each chapter sequentially
|
|
- **Flow**:
|
|
1. Iterate through documents
|
|
2. For each document, iterate through chapters
|
|
3. For each chapter, iterate through sections
|
|
4. For each section, generate content using AI
|
|
5. Update progress after each section
|
|
|
|
## Desired State (Parallel)
|
|
|
|
### Chapter Sections Structure Generation
|
|
- **Target**: Process all chapters in parallel
|
|
- **Requirements**:
|
|
- Maintain chapter order in final result
|
|
- Each chapter can be processed independently
|
|
- Progress updates should reflect parallel processing
|
|
- Errors in one chapter should not stop others
|
|
|
|
### Section Content Generation
|
|
- **Target**: Process sections within each chapter in parallel
|
|
- **Requirements**:
|
|
- Maintain section order within each chapter
|
|
- Sections within a chapter can be processed independently
|
|
- Chapters still processed sequentially (to maintain order)
|
|
- Progress updates should reflect parallel processing
|
|
- Errors in one section should not stop others
|
|
|
|
## Implementation Strategy
|
|
|
|
### Phase 1: Chapter Sections Structure Generation Parallelization
|
|
|
|
#### Step 1.1: Extract Single Chapter Processing
|
|
- **Create**: `_generateSingleChapterSectionsStructure()` method
|
|
- **Purpose**: Process one chapter independently
|
|
- **Parameters**:
|
|
- `chapter`: Chapter dict
|
|
- `chapterIndex`: Index for ordering
|
|
- `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata
|
|
- `generationHint`: Generation instructions
|
|
- `contentPartIds`, `contentPartInstructions`: Content part info
|
|
- `contentParts`: Full content parts list
|
|
- `userPrompt`: User's original prompt
|
|
- `language`: Language for generation
|
|
- `parentOperationId`: For progress logging
|
|
- **Returns**: None (modifies chapter dict in place)
|
|
- **Error Handling**: Logs errors, raises exception to be caught by caller
|
|
|
|
#### Step 1.2: Refactor Main Method
|
|
- **Modify**: `_generateChapterSectionsStructure()`
|
|
- **Changes**:
|
|
1. Collect all chapters with their indices
|
|
2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure`
|
|
3. Use `asyncio.gather()` to execute all tasks in parallel
|
|
4. Process results in order (using `zip` with original order)
|
|
5. Handle errors per chapter (don't fail entire operation)
|
|
6. Update progress after each chapter completes
|
|
|
|
#### Step 1.3: Progress Reporting
|
|
- **Maintain**: Overall progress tracking
|
|
- **Update**: Progress after each chapter completes (not sequentially)
|
|
- **Format**: "Chapter X/Y completed" or "Chapter X/Y error"
|
|
|
|
### Phase 2: Section Content Generation Parallelization
|
|
|
|
#### Step 2.1: Extract Single Section Processing
|
|
- **Create**: `_processSingleSection()` method
|
|
- **Purpose**: Process one section independently
|
|
- **Parameters**:
|
|
- `section`: Section dict
|
|
- `sectionIndex`: Index for ordering
|
|
- `totalSections`: Total sections in chapter
|
|
- `chapterIndex`: Chapter index
|
|
- `totalChapters`: Total chapters
|
|
- `chapterId`: Chapter ID
|
|
- `chapterOperationId`: Chapter progress operation ID
|
|
- `fillOperationId`: Overall fill operation ID
|
|
- `contentParts`: Full content parts list
|
|
- `userPrompt`: User's original prompt
|
|
- `all_sections_list`: All sections for context
|
|
- `language`: Language for generation
|
|
- `calculateOverallProgress`: Function to calculate overall progress
|
|
- **Returns**: `List[Dict[str, Any]]` (elements for the section)
|
|
- **Error Handling**: Returns error element instead of raising
|
|
|
|
#### Step 2.2: Extract Section Processing Logic
|
|
- **Create**: Helper methods for different processing paths:
|
|
- `_processSectionAggregation()`: Handle aggregation path (multiple parts)
|
|
- `_processSectionGeneration()`: Handle generation without parts (only generationHint)
|
|
- `_processSectionParts()`: Handle individual part processing
|
|
- **Purpose**: Keep logic organized and reusable
|
|
|
|
#### Step 2.3: Refactor Main Method
|
|
- **Modify**: `_fillChapterSections()`
|
|
- **Changes**:
|
|
1. Keep sequential chapter processing (maintains order)
|
|
2. For each chapter, collect all sections with indices
|
|
3. Create async tasks for each section using `_processSingleSection`
|
|
4. Use `asyncio.gather()` to execute all section tasks in parallel
|
|
5. Process results in order (using `zip` with original order)
|
|
6. Assign elements to sections in correct order
|
|
7. Update progress after each section completes
|
|
8. Handle errors per section (don't fail entire chapter)
|
|
|
|
#### Step 2.4: Progress Reporting
|
|
- **Maintain**: Hierarchical progress tracking
|
|
- **Update**:
|
|
- Section progress: After each section completes
|
|
- Chapter progress: After all sections in chapter complete
|
|
- Overall progress: After each section/chapter completes
|
|
- **Format**: "Chapter X/Y, Section A/B completed"
|
|
|
|
## Key Considerations
|
|
|
|
### Order Preservation
|
|
- **Chapters**: Must maintain document order → process chapters sequentially
|
|
- **Sections**: Must maintain chapter order → process sections sequentially within chapter
|
|
- **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order
|
|
|
|
### Error Handling
|
|
- **Chapters**: Error in one chapter should not stop others
|
|
- **Sections**: Error in one section should not stop others
|
|
- **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)`
|
|
|
|
### Progress Reporting
|
|
- **Challenge**: Progress updates happen out of order
|
|
- **Solution**: Update progress when each task completes, not sequentially
|
|
- **Format**: Show completed count, not sequential position
|
|
|
|
### Shared State
|
|
- **Chapters**: Modify chapter dicts in place (safe, each chapter is independent)
|
|
- **Sections**: Return elements, assign to sections in order (safe, each section is independent)
|
|
- **Content Parts**: Read-only, passed to all tasks (safe)
|
|
|
|
### Dependencies
|
|
- **Chapters**: No dependencies between chapters
|
|
- **Sections**: No dependencies between sections (each is self-contained)
|
|
- **Solution**: All tasks can run truly in parallel
|
|
|
|
## Implementation Steps
|
|
|
|
### Step 1: Clean Current Code
|
|
1. Ensure current sequential implementation is correct
|
|
2. Fix any existing bugs
|
|
3. Verify all tests pass
|
|
|
|
### Step 2: Implement Chapter Parallelization
|
|
1. Create `_generateSingleChapterSectionsStructure()` method
|
|
2. Extract chapter processing logic
|
|
3. Refactor `_generateChapterSectionsStructure()` to use parallel processing
|
|
4. Test with single chapter
|
|
5. Test with multiple chapters
|
|
6. Verify order preservation
|
|
7. Verify error handling
|
|
|
|
### Step 3: Implement Section Parallelization
|
|
1. Create `_processSingleSection()` method
|
|
2. Extract section processing logic into helper methods
|
|
3. Refactor `_fillChapterSections()` to use parallel processing for sections
|
|
4. Test with single section
|
|
5. Test with multiple sections
|
|
6. Test with multiple chapters
|
|
7. Verify order preservation
|
|
8. Verify error handling
|
|
|
|
### Step 4: Testing & Validation
|
|
1. Test with various document structures
|
|
2. Test error scenarios
|
|
3. Verify progress reporting accuracy
|
|
4. Performance testing (compare sequential vs parallel)
|
|
5. Verify final output order matches input order
|
|
|
|
## Code Structure
|
|
|
|
### New Methods to Create
|
|
|
|
```python
|
|
async def _generateSingleChapterSectionsStructure(
|
|
self,
|
|
chapter: Dict[str, Any],
|
|
chapterIndex: int,
|
|
chapterId: str,
|
|
chapterLevel: int,
|
|
chapterTitle: str,
|
|
generationHint: str,
|
|
contentPartIds: List[str],
|
|
contentPartInstructions: Dict[str, Any],
|
|
contentParts: List[ContentPart],
|
|
userPrompt: str,
|
|
language: str,
|
|
parentOperationId: str
|
|
) -> None:
|
|
"""Generate sections structure for a single chapter (used for parallel processing)."""
|
|
# Extract logic from current sequential loop
|
|
# Modify chapter dict in place
|
|
# Handle errors internally, raise if critical
|
|
|
|
async def _processSingleSection(
|
|
self,
|
|
section: Dict[str, Any],
|
|
sectionIndex: int,
|
|
totalSections: int,
|
|
chapterIndex: int,
|
|
totalChapters: int,
|
|
chapterId: str,
|
|
chapterOperationId: str,
|
|
fillOperationId: str,
|
|
contentParts: List[ContentPart],
|
|
userPrompt: str,
|
|
all_sections_list: List[Dict[str, Any]],
|
|
language: str,
|
|
calculateOverallProgress: Callable
|
|
) -> List[Dict[str, Any]]:
|
|
"""Process a single section and return its elements."""
|
|
# Extract logic from current sequential loop
|
|
# Return elements list
|
|
# Return error element on failure (don't raise)
|
|
|
|
async def _processSectionAggregation(
|
|
self,
|
|
section: Dict[str, Any],
|
|
sectionId: str,
|
|
sectionTitle: str,
|
|
sectionIndex: int,
|
|
totalSections: int,
|
|
chapterId: str,
|
|
chapterOperationId: str,
|
|
fillOperationId: str,
|
|
contentPartIds: List[str],
|
|
contentFormats: Dict[str, str],
|
|
contentParts: List[ContentPart],
|
|
userPrompt: str,
|
|
generationHint: str,
|
|
all_sections_list: List[Dict[str, Any]],
|
|
language: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Process section with aggregation (multiple parts together)."""
|
|
# Extract aggregation logic
|
|
# Return elements list
|
|
|
|
async def _processSectionGeneration(
|
|
self,
|
|
section: Dict[str, Any],
|
|
sectionId: str,
|
|
sectionTitle: str,
|
|
sectionIndex: int,
|
|
totalSections: int,
|
|
chapterId: str,
|
|
chapterOperationId: str,
|
|
fillOperationId: str,
|
|
contentType: str,
|
|
userPrompt: str,
|
|
generationHint: str,
|
|
all_sections_list: List[Dict[str, Any]],
|
|
language: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Process section generation without content parts (only generationHint)."""
|
|
# Extract generation logic
|
|
# Return elements list
|
|
|
|
async def _processSectionParts(
|
|
self,
|
|
section: Dict[str, Any],
|
|
sectionId: str,
|
|
sectionTitle: str,
|
|
sectionIndex: int,
|
|
totalSections: int,
|
|
chapterId: str,
|
|
chapterOperationId: str,
|
|
fillOperationId: str,
|
|
contentPartIds: List[str],
|
|
contentFormats: Dict[str, str],
|
|
contentParts: List[ContentPart],
|
|
contentType: str,
|
|
useAiCall: bool,
|
|
generationHint: str,
|
|
userPrompt: str,
|
|
all_sections_list: List[Dict[str, Any]],
|
|
language: str
|
|
) -> List[Dict[str, Any]]:
|
|
"""Process individual parts in a section."""
|
|
# Extract individual part processing logic
|
|
# Return elements list
|
|
```
|
|
|
|
### Modified Methods
|
|
|
|
```python
|
|
async def _generateChapterSectionsStructure(
|
|
self,
|
|
chapterStructure: Dict[str, Any],
|
|
contentParts: List[ContentPart],
|
|
userPrompt: str,
|
|
parentOperationId: str
|
|
) -> Dict[str, Any]:
|
|
"""Generate sections structure for all chapters in parallel."""
|
|
# Collect chapters with indices
|
|
# Create tasks
|
|
# Execute in parallel
|
|
# Process results in order
|
|
# Update progress
|
|
|
|
async def _fillChapterSections(
|
|
self,
|
|
chapterStructure: Dict[str, Any],
|
|
contentParts: List[ContentPart],
|
|
userPrompt: str,
|
|
fillOperationId: str
|
|
) -> Dict[str, Any]:
|
|
"""Fill sections with content, processing sections in parallel within each chapter."""
|
|
# Process chapters sequentially
|
|
# For each chapter, process sections in parallel
|
|
# Maintain order
|
|
# Update progress
|
|
```
|
|
|
|
## Testing Strategy
|
|
|
|
### Unit Tests
|
|
1. Test `_generateSingleChapterSectionsStructure` independently
|
|
2. Test `_processSingleSection` independently
|
|
3. Test helper methods independently
|
|
|
|
### Integration Tests
|
|
1. Test parallel chapter processing with multiple chapters
|
|
2. Test parallel section processing with multiple sections
|
|
3. Test error handling (one chapter/section fails)
|
|
4. Test order preservation
|
|
|
|
### Performance Tests
|
|
1. Measure sequential vs parallel execution time
|
|
2. Verify parallel processing is faster
|
|
3. Check resource usage (memory, CPU)
|
|
|
|
## Risk Mitigation
|
|
|
|
### Risks
|
|
1. **Order not preserved**: Use `zip` with original order
|
|
2. **Race conditions**: No shared mutable state between tasks
|
|
3. **Progress reporting incorrect**: Update progress when tasks complete
|
|
4. **Errors not handled**: Use `return_exceptions=True` and check results
|
|
5. **Performance degradation**: Test and measure, fallback to sequential if needed
|
|
|
|
### Safety Measures
|
|
1. Keep sequential implementation as fallback (commented out)
|
|
2. Add feature flag to enable/disable parallel processing
|
|
3. Extensive logging for debugging
|
|
4. Gradual rollout (test with small datasets first)
|
|
|
|
## Migration Path
|
|
|
|
1. **Phase 1**: Implement chapter parallelization, test thoroughly
|
|
2. **Phase 2**: Implement section parallelization, test thoroughly
|
|
3. **Phase 3**: Enable both in production with monitoring
|
|
4. **Phase 4**: Remove sequential fallback code (if stable)
|
|
|
|
## Notes
|
|
|
|
- All async methods must use `await` correctly
|
|
- Progress updates happen asynchronously (may appear out of order in logs)
|
|
- Final result order is guaranteed by processing results in order
|
|
- Error handling is per-task, not global
|
|
- No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts)
|
|
|