Merge pull request #79 from valueonag/feat/control-user-access-budget

Feat/control user access budget
This commit is contained in:
Patrick Motsch 2025-12-30 11:18:56 +01:00 committed by GitHub
commit f6540d6b5c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 1851 additions and 1058 deletions

View file

@ -0,0 +1,376 @@
# Parallel Processing Refactoring Concept
## Current State (Sequential)
### Chapter Sections Structure Generation (`_generateChapterSectionsStructure`)
- **Current**: Processes chapters sequentially, one after another
- **Flow**:
1. Iterate through documents
2. For each document, iterate through chapters
3. For each chapter, generate sections structure using AI
4. Update progress after each chapter
### Section Content Generation (`_fillChapterSections`)
- **Current**: Processes chapters sequentially, sections within each chapter sequentially
- **Flow**:
1. Iterate through documents
2. For each document, iterate through chapters
3. For each chapter, iterate through sections
4. For each section, generate content using AI
5. Update progress after each section
## Desired State (Parallel)
### Chapter Sections Structure Generation
- **Target**: Process all chapters in parallel
- **Requirements**:
- Maintain chapter order in final result
- Each chapter can be processed independently
- Progress updates should reflect parallel processing
- Errors in one chapter should not stop others
### Section Content Generation
- **Target**: Process sections within each chapter in parallel
- **Requirements**:
- Maintain section order within each chapter
- Sections within a chapter can be processed independently
- Chapters still processed sequentially (to maintain order)
- Progress updates should reflect parallel processing
- Errors in one section should not stop others
## Implementation Strategy
### Phase 1: Chapter Sections Structure Generation Parallelization
#### Step 1.1: Extract Single Chapter Processing
- **Create**: `_generateSingleChapterSectionsStructure()` method
- **Purpose**: Process one chapter independently
- **Parameters**:
- `chapter`: Chapter dict
- `chapterIndex`: Index for ordering
- `chapterId`, `chapterLevel`, `chapterTitle`: Chapter metadata
- `generationHint`: Generation instructions
- `contentPartIds`, `contentPartInstructions`: Content part info
- `contentParts`: Full content parts list
- `userPrompt`: User's original prompt
- `language`: Language for generation
- `parentOperationId`: For progress logging
- **Returns**: None (modifies chapter dict in place)
- **Error Handling**: Logs errors, raises exception to be caught by caller
#### Step 1.2: Refactor Main Method
- **Modify**: `_generateChapterSectionsStructure()`
- **Changes**:
1. Collect all chapters with their indices
2. Create async tasks for each chapter using `_generateSingleChapterSectionsStructure`
3. Use `asyncio.gather()` to execute all tasks in parallel
4. Process results in order (using `zip` with original order)
5. Handle errors per chapter (don't fail entire operation)
6. Update progress after each chapter completes
#### Step 1.3: Progress Reporting
- **Maintain**: Overall progress tracking
- **Update**: Progress after each chapter completes (not sequentially)
- **Format**: "Chapter X/Y completed" or "Chapter X/Y error"
### Phase 2: Section Content Generation Parallelization
#### Step 2.1: Extract Single Section Processing
- **Create**: `_processSingleSection()` method
- **Purpose**: Process one section independently
- **Parameters**:
- `section`: Section dict
- `sectionIndex`: Index for ordering
- `totalSections`: Total sections in chapter
- `chapterIndex`: Chapter index
- `totalChapters`: Total chapters
- `chapterId`: Chapter ID
- `chapterOperationId`: Chapter progress operation ID
- `fillOperationId`: Overall fill operation ID
- `contentParts`: Full content parts list
- `userPrompt`: User's original prompt
- `all_sections_list`: All sections for context
- `language`: Language for generation
- `calculateOverallProgress`: Function to calculate overall progress
- **Returns**: `List[Dict[str, Any]]` (elements for the section)
- **Error Handling**: Returns error element instead of raising
#### Step 2.2: Extract Section Processing Logic
- **Create**: Helper methods for different processing paths:
- `_processSectionAggregation()`: Handle aggregation path (multiple parts)
- `_processSectionGeneration()`: Handle generation without parts (only generationHint)
- `_processSectionParts()`: Handle individual part processing
- **Purpose**: Keep logic organized and reusable
#### Step 2.3: Refactor Main Method
- **Modify**: `_fillChapterSections()`
- **Changes**:
1. Keep sequential chapter processing (maintains order)
2. For each chapter, collect all sections with indices
3. Create async tasks for each section using `_processSingleSection`
4. Use `asyncio.gather()` to execute all section tasks in parallel
5. Process results in order (using `zip` with original order)
6. Assign elements to sections in correct order
7. Update progress after each section completes
8. Handle errors per section (don't fail entire chapter)
#### Step 2.4: Progress Reporting
- **Maintain**: Hierarchical progress tracking
- **Update**:
- Section progress: After each section completes
- Chapter progress: After all sections in chapter complete
- Overall progress: After each section/chapter completes
- **Format**: "Chapter X/Y, Section A/B completed"
## Key Considerations
### Order Preservation
- **Chapters**: Must maintain document order → process chapters sequentially
- **Sections**: Must maintain chapter order → process sections sequentially within chapter
- **Solution**: Use `asyncio.gather()` with ordered task list, then `zip` results with original order
### Error Handling
- **Chapters**: Error in one chapter should not stop others
- **Sections**: Error in one section should not stop others
- **Solution**: Use `return_exceptions=True` in `asyncio.gather()`, check `isinstance(result, Exception)`
### Progress Reporting
- **Challenge**: Progress updates happen out of order
- **Solution**: Update progress when each task completes, not sequentially
- **Format**: Show completed count, not sequential position
### Shared State
- **Chapters**: Modify chapter dicts in place (safe, each chapter is independent)
- **Sections**: Return elements, assign to sections in order (safe, each section is independent)
- **Content Parts**: Read-only, passed to all tasks (safe)
### Dependencies
- **Chapters**: No dependencies between chapters
- **Sections**: No dependencies between sections (each is self-contained)
- **Solution**: All tasks can run truly in parallel
## Implementation Steps
### Step 1: Clean Current Code
1. Ensure current sequential implementation is correct
2. Fix any existing bugs
3. Verify all tests pass
### Step 2: Implement Chapter Parallelization
1. Create `_generateSingleChapterSectionsStructure()` method
2. Extract chapter processing logic
3. Refactor `_generateChapterSectionsStructure()` to use parallel processing
4. Test with single chapter
5. Test with multiple chapters
6. Verify order preservation
7. Verify error handling
### Step 3: Implement Section Parallelization
1. Create `_processSingleSection()` method
2. Extract section processing logic into helper methods
3. Refactor `_fillChapterSections()` to use parallel processing for sections
4. Test with single section
5. Test with multiple sections
6. Test with multiple chapters
7. Verify order preservation
8. Verify error handling
### Step 4: Testing & Validation
1. Test with various document structures
2. Test error scenarios
3. Verify progress reporting accuracy
4. Performance testing (compare sequential vs parallel)
5. Verify final output order matches input order
## Code Structure
### New Methods to Create
```python
async def _generateSingleChapterSectionsStructure(
self,
chapter: Dict[str, Any],
chapterIndex: int,
chapterId: str,
chapterLevel: int,
chapterTitle: str,
generationHint: str,
contentPartIds: List[str],
contentPartInstructions: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
language: str,
parentOperationId: str
) -> None:
"""Generate sections structure for a single chapter (used for parallel processing)."""
# Extract logic from current sequential loop
# Modify chapter dict in place
# Handle errors internally, raise if critical
async def _processSingleSection(
self,
section: Dict[str, Any],
sectionIndex: int,
totalSections: int,
chapterIndex: int,
totalChapters: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentParts: List[ContentPart],
userPrompt: str,
all_sections_list: List[Dict[str, Any]],
language: str,
calculateOverallProgress: Callable
) -> List[Dict[str, Any]]:
"""Process a single section and return its elements."""
# Extract logic from current sequential loop
# Return elements list
# Return error element on failure (don't raise)
async def _processSectionAggregation(
self,
section: Dict[str, Any],
sectionId: str,
sectionTitle: str,
sectionIndex: int,
totalSections: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentPartIds: List[str],
contentFormats: Dict[str, str],
contentParts: List[ContentPart],
userPrompt: str,
generationHint: str,
all_sections_list: List[Dict[str, Any]],
language: str
) -> List[Dict[str, Any]]:
"""Process section with aggregation (multiple parts together)."""
# Extract aggregation logic
# Return elements list
async def _processSectionGeneration(
self,
section: Dict[str, Any],
sectionId: str,
sectionTitle: str,
sectionIndex: int,
totalSections: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentType: str,
userPrompt: str,
generationHint: str,
all_sections_list: List[Dict[str, Any]],
language: str
) -> List[Dict[str, Any]]:
"""Process section generation without content parts (only generationHint)."""
# Extract generation logic
# Return elements list
async def _processSectionParts(
self,
section: Dict[str, Any],
sectionId: str,
sectionTitle: str,
sectionIndex: int,
totalSections: int,
chapterId: str,
chapterOperationId: str,
fillOperationId: str,
contentPartIds: List[str],
contentFormats: Dict[str, str],
contentParts: List[ContentPart],
contentType: str,
useAiCall: bool,
generationHint: str,
userPrompt: str,
all_sections_list: List[Dict[str, Any]],
language: str
) -> List[Dict[str, Any]]:
"""Process individual parts in a section."""
# Extract individual part processing logic
# Return elements list
```
### Modified Methods
```python
async def _generateChapterSectionsStructure(
self,
chapterStructure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
parentOperationId: str
) -> Dict[str, Any]:
"""Generate sections structure for all chapters in parallel."""
# Collect chapters with indices
# Create tasks
# Execute in parallel
# Process results in order
# Update progress
async def _fillChapterSections(
self,
chapterStructure: Dict[str, Any],
contentParts: List[ContentPart],
userPrompt: str,
fillOperationId: str
) -> Dict[str, Any]:
"""Fill sections with content, processing sections in parallel within each chapter."""
# Process chapters sequentially
# For each chapter, process sections in parallel
# Maintain order
# Update progress
```
## Testing Strategy
### Unit Tests
1. Test `_generateSingleChapterSectionsStructure` independently
2. Test `_processSingleSection` independently
3. Test helper methods independently
### Integration Tests
1. Test parallel chapter processing with multiple chapters
2. Test parallel section processing with multiple sections
3. Test error handling (one chapter/section fails)
4. Test order preservation
### Performance Tests
1. Measure sequential vs parallel execution time
2. Verify parallel processing is faster
3. Check resource usage (memory, CPU)
## Risk Mitigation
### Risks
1. **Order not preserved**: Use `zip` with original order
2. **Race conditions**: No shared mutable state between tasks
3. **Progress reporting incorrect**: Update progress when tasks complete
4. **Errors not handled**: Use `return_exceptions=True` and check results
5. **Performance degradation**: Test and measure, fallback to sequential if needed
### Safety Measures
1. Keep sequential implementation as fallback (commented out)
2. Add feature flag to enable/disable parallel processing
3. Extensive logging for debugging
4. Gradual rollout (test with small datasets first)
## Migration Path
1. **Phase 1**: Implement chapter parallelization, test thoroughly
2. **Phase 2**: Implement section parallelization, test thoroughly
3. **Phase 3**: Enable both in production with monitoring
4. **Phase 4**: Remove sequential fallback code (if stable)
## Notes
- All async methods must use `await` correctly
- Progress updates happen asynchronously (may appear out of order in logs)
- Final result order is guaranteed by processing results in order
- Error handling is per-task, not global
- No shared mutable state between parallel tasks (read-only contentParts, independent chapter/section dicts)

View file

@ -1,126 +0,0 @@
# Refactoring Plan für mainServiceAi.py
## Ziel
Aufteilen des 3000-Zeilen-Moduls in überschaubare Submodule (~300-600 Zeilen pro Modul).
## Vorgeschlagene Struktur
### Bereits erstellt:
1. ✅ `subResponseParsing.py` - ResponseParser Klasse
2. ✅ `subDocumentIntents.py` - DocumentIntentAnalyzer Klasse
### Noch zu erstellen:
3. `subContentExtraction.py` - ContentExtractor Klasse
- `extractAndPrepareContent()` (~490 Zeilen)
- `extractTextFromImage()` (~55 Zeilen)
- `processTextContentWithAi()` (~72 Zeilen)
- `_isBinary()` (~10 Zeilen)
4. `subStructureGeneration.py` - StructureGenerator Klasse
- `generateStructure()` (~60 Zeilen)
- `_buildStructurePrompt()` (~130 Zeilen)
5. `subStructureFilling.py` - StructureFiller Klasse
- `fillStructure()` (~290 Zeilen)
- `_buildSectionGenerationPrompt()` (~185 Zeilen)
- `_findContentPartById()` (~5 Zeilen)
- `_needsAggregation()` (~20 Zeilen)
6. `subAiCallLooping.py` - AiCallLooper Klasse
- `callAiWithLooping()` (~405 Zeilen)
- `_defineKpisFromPrompt()` (~92 Zeilen)
## Refactoring-Schritte für mainServiceAi.py
### Schritt 1: Submodule-Initialisierung erweitern
```python
def _initializeSubmodules(self):
"""Initialize all submodules after aiObjects is ready."""
if self.aiObjects is None:
raise RuntimeError("aiObjects must be initialized before initializing submodules")
if self.extractionService is None:
logger.info("Initializing ExtractionService...")
self.extractionService = ExtractionService(self.services)
# Neue Submodule initialisieren
from modules.services.serviceAi.subResponseParsing import ResponseParser
from modules.services.serviceAi.subDocumentIntents import DocumentIntentAnalyzer
from modules.services.serviceAi.subContentExtraction import ContentExtractor
from modules.services.serviceAi.subStructureGeneration import StructureGenerator
from modules.services.serviceAi.subStructureFilling import StructureFiller
if not hasattr(self, 'responseParser'):
self.responseParser = ResponseParser(self.services)
if not hasattr(self, 'intentAnalyzer'):
self.intentAnalyzer = DocumentIntentAnalyzer(self.services, self)
if not hasattr(self, 'contentExtractor'):
self.contentExtractor = ContentExtractor(self.services, self)
if not hasattr(self, 'structureGenerator'):
self.structureGenerator = StructureGenerator(self.services, self)
if not hasattr(self, 'structureFiller'):
self.structureFiller = StructureFiller(self.services, self)
```
### Schritt 2: Methoden durch Delegation ersetzen
**Beispiel für Response Parsing:**
```python
# ALT:
def _extractSectionsFromResponse(self, ...):
# 100 Zeilen Code
...
# NEU:
def _extractSectionsFromResponse(self, ...):
return self.responseParser.extractSectionsFromResponse(...)
```
**Beispiel für Document Intents:**
```python
# ALT:
async def _clarifyDocumentIntents(self, ...):
# 100 Zeilen Code
...
# NEU:
async def _clarifyDocumentIntents(self, ...):
return await self.intentAnalyzer.clarifyDocumentIntents(...)
```
### Schritt 3: Helper-Methoden beibehalten
Kleine Helper-Methoden bleiben im Hauptmodul:
- `_buildPromptWithPlaceholders()`
- `_getIntentForDocument()`
- `_shouldSkipContentPart()`
- `_determineDocumentName()`
### Schritt 4: Public API unverändert lassen
Die öffentliche API (`callAiPlanning`, `callAiContent`) bleibt unverändert.
## Erwartete Ergebnis-Größen
- `mainServiceAi.py`: ~800-1000 Zeilen (von 3016)
- `subResponseParsing.py`: ~200 Zeilen ✅
- `subDocumentIntents.py`: ~300 Zeilen ✅
- `subContentExtraction.py`: ~600 Zeilen
- `subStructureGeneration.py`: ~200 Zeilen
- `subStructureFilling.py`: ~400 Zeilen
- `subAiCallLooping.py`: ~500 Zeilen
**Gesamt: ~3000 Zeilen** (gleich, aber besser organisiert)
## Vorteile
1. **Übersichtlichkeit**: Jedes Modul hat eine klare Verantwortlichkeit
2. **Wartbarkeit**: Änderungen sind lokalisiert
3. **Testbarkeit**: Module können einzeln getestet werden
4. **Wiederverwendbarkeit**: Module können in anderen Kontexten verwendet werden

View file

@ -676,6 +676,7 @@ Respond with ONLY a JSON object in this exact format:
)
# Schritt 5D: Fülle Struktur
# Language will be extracted from services (user intention analysis) in fillStructure
filledStructure = await self._fillStructure(
structure,
contentParts or [],

View file

@ -14,7 +14,7 @@ from typing import Dict, Any, List, Optional, Callable
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum, JsonAccumulationState
from modules.datamodels.datamodelExtraction import ContentPart
from modules.shared.jsonUtils import buildContinuationContext, extractJsonString
from modules.shared.jsonUtils import buildContinuationContext, extractJsonString, tryParseJson
from modules.services.serviceAi.subJsonResponseHandling import JsonResponseHandler
logger = logging.getLogger(__name__)
@ -122,10 +122,14 @@ class AiCallLooper:
)
# Write the ACTUAL prompt sent to AI
if iteration == 1:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
else:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
# For section content generation: only write one prompt file (first iteration)
# For document generation: write prompt for each iteration
isSectionContent = "_section_" in debugPrefix
if iteration == 1 or not isSectionContent:
if iteration == 1:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt")
elif not isSectionContent:
self.services.utils.writeDebugFile(iterationPrompt, f"{debugPrefix}_prompt_iteration_{iteration}")
response = await self.aiService.callAi(request)
result = response.content
@ -146,10 +150,13 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(iterationOperationId, 0.6, f"AI response received ({bytesDisplay})")
# Write raw AI response to debug file
if iteration == 1:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
else:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# For section content generation: only write one response file (first iteration)
# For document generation: write response for each iteration
if iteration == 1 or not isSectionContent:
if iteration == 1:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response")
elif not isSectionContent:
self.services.utils.writeDebugFile(result, f"{debugPrefix}_response_iteration_{iteration}")
# Emit stats for this iteration (only if workflow exists and has id)
if self.services.workflow and hasattr(self.services.workflow, 'id') and self.services.workflow.id:
@ -192,6 +199,38 @@ class AiCallLooper:
# Store raw response for continuation (even if broken)
lastRawResponse = result
# Check if this is section content generation (has "elements" not "sections")
# Section content generation returns JSON with "elements" array, not document structure with "sections"
isSectionContentGeneration = False
parsedJsonForSection = None
extractedJsonForSection = None
try:
extractedJsonForSection = extractJsonString(result)
parsedJson, parseError, _ = tryParseJson(extractedJsonForSection)
if parseError is None and parsedJson:
parsedJsonForSection = parsedJson
# Check if JSON has "elements" (section content) or "sections" (document structure)
if isinstance(parsedJson, dict):
if "elements" in parsedJson:
isSectionContentGeneration = True
elif isinstance(parsedJson, list) and len(parsedJson) > 0:
# Check if it's a list of elements (section content format)
if isinstance(parsedJson[0], dict) and "type" in parsedJson[0]:
isSectionContentGeneration = True
except Exception:
pass
if isSectionContentGeneration:
# This is section content generation - return the JSON directly
# No need to extract sections, just return the complete JSON string
logger.info(f"Iteration {iteration}: Section content generation detected (elements found), returning JSON directly")
if iterationOperationId:
self.services.chat.progressLogFinish(iterationOperationId, True)
# Note: Debug files (_prompt and _response) are already written above for iteration 1
# No need to write _final_result as it's redundant with _response
final_json = json.dumps(parsedJsonForSection, indent=2, ensure_ascii=False) if parsedJsonForSection else (extractedJsonForSection or result)
return final_json
# Extract sections from response (handles both valid and broken JSON)
# Only for document generation (JSON responses)
# CRITICAL: Pass allSections and accumulationState to enable string accumulation
@ -365,7 +404,10 @@ class AiCallLooper:
self.services.chat.progressLogUpdate(operationId, estimatedProgress, f"Pipeline: {bytesDisplay} (iteration {iteration})")
# Log merged sections for debugging
self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# For section content generation: skip merged sections debug files (only one prompt/response needed)
isSectionContent = "_section_" in debugPrefix
if not isSectionContent:
self.services.utils.writeDebugFile(merged_json_str, f"{debugPrefix}_merged_sections_iteration_{iteration}")
# Check if we should continue (completion detection)
# Simple logic: JSON completeness determines continuation
@ -433,7 +475,10 @@ class AiCallLooper:
final_result = self.responseParser.buildFinalResultFromSections(allSections, documentMetadata)
# Write final result to debug file
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
# For section content generation: skip final_result debug file (response already written)
isSectionContent = "_section_" in debugPrefix
if not isSectionContent:
self.services.utils.writeDebugFile(final_result, f"{debugPrefix}_final_result")
return final_result

File diff suppressed because it is too large Load diff

View file

@ -24,6 +24,20 @@ class StructureGenerator:
self.services = services
self.aiService = aiService
def _getUserLanguage(self) -> str:
"""Get user language for document generation"""
try:
if self.services:
# Prefer detected language if available (from user intention analysis)
if hasattr(self.services, 'currentUserLanguage') and self.services.currentUserLanguage:
return self.services.currentUserLanguage
# Fallback to user's preferred language
elif hasattr(self.services, 'user') and self.services.user and hasattr(self.services.user, 'language'):
return self.services.user.language
except Exception:
pass
return 'en' # Default fallback
async def generateStructure(
self,
userPrompt: str,
@ -76,7 +90,30 @@ class StructureGenerator:
)
# Parse Struktur
structure = json.loads(self.services.utils.jsonExtractString(aiResponse))
# Use tryParseJson which handles malformed JSON and unterminated strings
extractedJson = self.services.utils.jsonExtractString(aiResponse)
parsedJson, parseError, cleanedJson = self.services.utils.jsonTryParse(extractedJson)
if parseError is not None:
# Try to repair broken JSON (handles unterminated strings, incomplete structures, etc.)
logger.warning(f"Initial JSON parsing failed: {str(parseError)}. Attempting repair...")
from modules.shared import jsonUtils
repairedJson = jsonUtils.repairBrokenJson(extractedJson)
if repairedJson:
# Try parsing repaired JSON
parsedJson, parseError, _ = self.services.utils.jsonTryParse(json.dumps(repairedJson))
if parseError is None:
logger.info("Successfully repaired and parsed JSON structure")
structure = parsedJson
else:
logger.error(f"Failed to parse repaired JSON: {str(parseError)}")
raise ValueError(f"Failed to parse JSON structure after repair: {str(parseError)}")
else:
logger.error(f"Failed to repair JSON. Parse error: {str(parseError)}")
logger.error(f"Cleaned JSON preview (first 500 chars): {cleanedJson[:500]}")
raise ValueError(f"Failed to parse JSON structure: {str(parseError)}")
else:
structure = parsedJson
# ChatLog abschließen
self.services.chat.progressLogFinish(structureOperationId, True)
@ -145,11 +182,17 @@ class StructureGenerator:
if not contentPartsIndex:
contentPartsIndex = "\n(No content parts available)"
# Get language from services (user intention analysis)
language = self._getUserLanguage()
logger.debug(f"Using language from services (user intention analysis) for structure generation: {language}")
prompt = f"""USER REQUEST (for context):
```
{userPrompt}
```
LANGUAGE: Generate all content in {language.upper()} language. All text, titles, headings, paragraphs, and content must be written in {language.upper()}.
AVAILABLE CONTENT PARTS:
{contentPartsIndex}
@ -199,7 +242,7 @@ RETURN JSON:
{{
"metadata": {{
"title": "Document Title",
"language": "de"
"language": "{language}"
}},
"documents": [{{
"id": "doc_1",

View file

@ -417,7 +417,10 @@ class RendererHtml(BaseRenderer):
if htmlParts:
return '\n'.join(htmlParts)
return self._renderJsonParagraph(sectionData, styles)
# If sectionData is not a list, treat it as a dict
if isinstance(sectionData, dict):
return self._renderJsonParagraph(sectionData, styles)
return ""
elif sectionType == "code_block":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
@ -451,7 +454,9 @@ class RendererHtml(BaseRenderer):
if htmlParts:
return '\n'.join(htmlParts)
# Fallback to paragraph for unknown types
return self._renderJsonParagraph(sectionData, styles)
if isinstance(sectionData, dict):
return self._renderJsonParagraph(sectionData, styles)
return ""
except Exception as e:
self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}")

View file

@ -162,7 +162,13 @@ class RendererMarkdown(BaseRenderer):
return self._renderJsonHeading(element)
return ""
elif sectionType == "paragraph":
return self._renderJsonParagraph(sectionData)
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonParagraph(element)
elif isinstance(sectionData, dict):
return self._renderJsonParagraph(sectionData)
return ""
elif sectionType == "code_block":
# Work directly with elements like other renderers
if isinstance(sectionData, list) and sectionData:
@ -177,7 +183,12 @@ class RendererMarkdown(BaseRenderer):
return ""
else:
# Fallback to paragraph for unknown types
return self._renderJsonParagraph(sectionData)
if isinstance(sectionData, list) and sectionData:
element = sectionData[0] if isinstance(sectionData[0], dict) else {}
return self._renderJsonParagraph(element)
elif isinstance(sectionData, dict):
return self._renderJsonParagraph(sectionData)
return ""
except Exception as e:
self.logger.warning(f"Error rendering section {self._getSectionId(section)}: {str(e)}")

View file

@ -8,6 +8,8 @@ Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
import json
import logging
import time
import asyncio
from urllib.parse import urlparse
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
@ -99,12 +101,18 @@ class WebService:
self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
# Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
if len(allUrls) > maxNumberPages:
allUrls = allUrls[:maxNumberPages]
# Step 3: Validate and filter URLs before crawling
validatedUrls = self._validateUrls(allUrls)
if not validatedUrls:
logger.warning(f"All {len(allUrls)} URLs failed validation")
return {"error": "No valid URLs found to crawl"}
# Filter to maxNumberPages (simple cut, no intelligent filtering)
if len(validatedUrls) > maxNumberPages:
validatedUrls = validatedUrls[:maxNumberPages]
logger.info(f"Limited URLs to {maxNumberPages}")
if not allUrls:
if not validatedUrls:
return {"error": "No URLs found to crawl"}
# Step 4: Translate researchDepth to maxDepth
@ -114,14 +122,14 @@ class WebService:
# Step 5: Crawl all URLs with hierarchical logging
if operationId:
self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating")
self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs")
# Use parent operation ID directly (parentId should be operationId, not log entry ID)
parentOperationId = operationId # Use the parent's operationId directly
crawlResult = await self._performWebCrawl(
instruction=instruction,
urls=allUrls,
urls=validatedUrls,
maxDepth=maxDepth,
parentOperationId=parentOperationId
)
@ -194,24 +202,24 @@ class WebService:
"max_depth": maxDepth,
"country": countryCode,
"language": languageCode,
"urls_crawled": allUrls[:20], # First 20 URLs for reference
"total_urls": len(allUrls),
"urls_crawled": validatedUrls[:20], # First 20 URLs for reference
"total_urls": len(validatedUrls),
"urls_with_content": urlsWithContent,
"total_content_length": totalContentLength,
"crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None
},
"sections": sections,
"statistics": {
"sectionCount": len(sections),
"total_urls": len(allUrls),
"sectionCount": len(sections),
"total_urls": len(validatedUrls),
"results_count": totalResults,
"urls_with_content": urlsWithContent,
"total_content_length": totalContentLength
},
# Keep original structure for backward compatibility
"instruction": instruction,
"urls_crawled": allUrls,
"total_urls": len(allUrls),
"urls_crawled": validatedUrls,
"total_urls": len(validatedUrls),
"results": crawlResult,
"total_results": totalResults
}
@ -383,6 +391,50 @@ Return ONLY valid JSON, no additional text:
logger.error(f"Error in web search: {str(e)}")
return []
def _validateUrls(self, urls: List[str]) -> List[str]:
"""
Validate URLs before crawling - filters out invalid URLs.
Args:
urls: List of URLs to validate
Returns:
List of valid URLs
"""
validatedUrls = []
for url in urls:
if not url or not isinstance(url, str):
logger.debug(f"Skipping invalid URL (not a string): {url}")
continue
url = url.strip()
if not url:
logger.debug(f"Skipping empty URL")
continue
# Basic URL validation using urlparse
try:
parsed = urlparse(url)
# Check if URL has at least scheme and netloc
if not parsed.scheme or not parsed.netloc:
logger.debug(f"Skipping invalid URL (missing scheme or netloc): {url}")
continue
# Only allow http/https schemes
if parsed.scheme not in ['http', 'https']:
logger.debug(f"Skipping URL with unsupported scheme '{parsed.scheme}': {url}")
continue
validatedUrls.append(url)
logger.debug(f"Validated URL: {url}")
except Exception as e:
logger.warning(f"Error validating URL '{url}': {str(e)}")
continue
logger.info(f"Validated {len(validatedUrls)}/{len(urls)} URLs")
return validatedUrls
async def _performWebCrawl(
self,
instruction: str,
@ -390,117 +442,165 @@ Return ONLY valid JSON, no additional text:
maxDepth: int = 2,
parentOperationId: Optional[str] = None
) -> List[Dict[str, Any]]:
"""Perform web crawl on list of URLs - calls plugin for each URL individually."""
crawlResults = []
# Loop over each URL and crawl one at a time
"""Perform web crawl on list of URLs - crawls URLs in parallel for better performance."""
# Create tasks for parallel crawling
crawlTasks = []
for urlIndex, url in enumerate(urls):
# Create separate operation for each URL with parent reference
urlOperationId = None
if parentOperationId:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
self.services.chat.progressLogStart(
urlOperationId,
"Web Crawl",
f"URL {urlIndex + 1}",
url[:50] + "..." if len(url) > 50 else url,
parentOperationId=parentOperationId
)
try:
logger.info(f"Crawling URL {urlIndex + 1}/{len(urls)}: {url}")
if urlOperationId:
displayUrl = url[:50] + "..." if len(url) > 50 else url
self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
# Build crawl prompt model for single URL
crawlPromptModel = AiCallPromptWebCrawl(
instruction=instruction,
url=url, # Single URL
maxDepth=maxDepth,
maxWidth=5 # Default: 5 pages per level
)
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
# Debug: persist crawl prompt (with URL identifier in content for clarity)
debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
# Call AI with WEB_CRAWL operation
crawlOptions = AiCallOptions(
operationType=OperationTypeEnum.WEB_CRAWL,
resultFormat="json"
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
# Use unified callAiContent method with parentOperationId for hierarchical logging
crawlResponse = await self.services.ai.callAiContent(
prompt=crawlPrompt,
options=crawlOptions,
outputFormat="json",
parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
# Extract content from AiResponse
crawlResult = crawlResponse.content
# Debug: persist crawl response
if isinstance(crawlResult, str):
self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
else:
self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
# Parse crawl result
if isinstance(crawlResult, str):
try:
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(crawlResult)
crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
except:
crawlData = {"url": url, "content": crawlResult}
else:
crawlData = crawlResult
# Process crawl results and create hierarchical progress logging for sub-URLs
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
# Recursively process crawl results to find nested URLs and create child operations
processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
# Count total URLs crawled (including sub-URLs) for progress message
totalUrlsCrawled = self._countUrlsInResults(processedResults)
# Ensure it's a list of results
if isinstance(processedResults, list):
crawlResults.extend(processedResults)
elif isinstance(processedResults, dict):
crawlResults.append(processedResults)
else:
crawlResults.append({"url": url, "content": str(processedResults)})
if urlOperationId:
if totalUrlsCrawled > 1:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
else:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
self.services.chat.progressLogFinish(urlOperationId, True)
except Exception as e:
logger.error(f"Error crawling URL {url}: {str(e)}")
if urlOperationId:
self.services.chat.progressLogFinish(urlOperationId, False)
crawlResults.append({"url": url, "error": str(e)})
task = self._crawlSingleUrl(
url=url,
urlIndex=urlIndex,
totalUrls=len(urls),
instruction=instruction,
maxDepth=maxDepth,
parentOperationId=parentOperationId
)
crawlTasks.append(task)
return crawlResults
# Execute all crawl tasks in parallel
logger.info(f"Starting parallel crawl of {len(urls)} URLs")
crawlResults = await asyncio.gather(*crawlTasks, return_exceptions=True)
# Process results and handle exceptions
processedResults = []
for idx, result in enumerate(crawlResults):
if isinstance(result, Exception):
logger.error(f"Error crawling URL {urls[idx]}: {str(result)}")
processedResults.append({"url": urls[idx], "error": str(result)})
else:
processedResults.extend(result if isinstance(result, list) else [result])
logger.info(f"Completed parallel crawl: {len(processedResults)} results")
return processedResults
async def _crawlSingleUrl(
self,
url: str,
urlIndex: int,
totalUrls: int,
instruction: str,
maxDepth: int,
parentOperationId: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Crawl a single URL - called in parallel for multiple URLs.
Args:
url: URL to crawl
urlIndex: Index of URL in the list
totalUrls: Total number of URLs being crawled
instruction: Research instruction
maxDepth: Maximum crawl depth
parentOperationId: Parent operation ID for progress tracking
Returns:
List of crawl results for this URL
"""
# Create separate operation for each URL with parent reference
urlOperationId = None
if parentOperationId:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}"
self.services.chat.progressLogStart(
urlOperationId,
"Web Crawl",
f"URL {urlIndex + 1}/{totalUrls}",
url[:50] + "..." if len(url) > 50 else url,
parentOperationId=parentOperationId
)
try:
logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls}: {url}")
if urlOperationId:
displayUrl = url[:50] + "..." if len(url) > 50 else url
self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}")
self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl")
# Build crawl prompt model for single URL
crawlPromptModel = AiCallPromptWebCrawl(
instruction=instruction,
url=url, # Single URL
maxDepth=maxDepth,
maxWidth=5 # Default: 5 pages per level
)
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
# Debug: persist crawl prompt (with URL identifier in content for clarity)
debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
# Call AI with WEB_CRAWL operation
crawlOptions = AiCallOptions(
operationType=OperationTypeEnum.WEB_CRAWL,
resultFormat="json"
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector")
# Use unified callAiContent method with parentOperationId for hierarchical logging
crawlResponse = await self.services.ai.callAiContent(
prompt=crawlPrompt,
options=crawlOptions,
outputFormat="json",
parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging
)
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results")
# Extract content from AiResponse
crawlResult = crawlResponse.content
# Debug: persist crawl response
if isinstance(crawlResult, str):
self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
else:
self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
# Parse crawl result
if isinstance(crawlResult, str):
try:
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(crawlResult)
crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
except:
crawlData = {"url": url, "content": crawlResult}
else:
crawlData = crawlResult
# Process crawl results and create hierarchical progress logging for sub-URLs
if urlOperationId:
self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results")
# Recursively process crawl results to find nested URLs and create child operations
processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0)
# Count total URLs crawled (including sub-URLs) for progress message
totalUrlsCrawled = self._countUrlsInResults(processedResults)
# Ensure it's a list of results
if isinstance(processedResults, list):
results = processedResults
elif isinstance(processedResults, dict):
results = [processedResults]
else:
results = [{"url": url, "content": str(processedResults)}]
if urlOperationId:
if totalUrlsCrawled > 1:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)")
else:
self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed")
self.services.chat.progressLogFinish(urlOperationId, True)
return results
except Exception as e:
logger.error(f"Error crawling URL {url}: {str(e)}")
if urlOperationId:
self.services.chat.progressLogFinish(urlOperationId, False)
return [{"url": url, "error": str(e)}]
def _processCrawlResultsWithHierarchy(
self,

View file

@ -132,13 +132,17 @@ class ProgressLogger:
return None
op = self.activeOperations[operationId]
message = f"Service {op['service']}"
message = f"{op['service']}"
workflow = self.services.workflow
if not workflow:
logger.warning(f"Cannot log progress: no workflow available")
return None
# Validate parentOperationId exists in activeOperations (for debugging)
if parentOperationId and parentOperationId not in self.activeOperations:
logger.debug(f"WARNING: Parent operation '{parentOperationId}' not found in activeOperations when creating log for '{operationId}'. Available operations: {list(self.activeOperations.keys())}. Child operation may appear at root level.")
# parentId in ChatLog should be the operationId of the parent operation, not the log entry ID
logData = {
"workflowId": workflow.id,

View file

@ -38,6 +38,11 @@ async def process(self, parameters: Dict[str, Any]) -> ActionResult:
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
if not parentOperationId:
logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
else:
logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
self.services.chat.progressLogStart(
operationId,
"Generate",

View file

@ -188,9 +188,19 @@ class MethodBase:
return wrapper
def _validateParameters(self, parameters: Dict[str, Any], paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]:
"""Validate parameters against definitions"""
"""Validate parameters against definitions
IMPORTANT: System parameters (like parentOperationId, expectedDocumentFormats) are preserved
even if they're not in the parameter definitions, as they're used internally by the framework.
"""
validated = {}
# System parameters that should always be preserved, even if not in paramDefs
systemParams = ['parentOperationId', 'expectedDocumentFormats']
for sysParam in systemParams:
if sysParam in parameters:
validated[sysParam] = parameters[sysParam]
for paramName, paramDef in paramDefs.items():
value = parameters.get(paramName)