# PowerOn AI-System Dokumentation - Validatoren und Workflow-Mechanismen ## Inhaltsverzeichnis 1. [Übersicht](#übersicht) 2. [Dynamischer Workflow](#dynamischer-workflow) 3. [Generischer AI Call Flow](#generischer-ai-call-flow) 4. [Standardisierte Datenmodelle](#standardisierte-datenmodelle) 5. [Validierung mit KPIs](#validierung-mit-kpis) 6. [Code-Struktur](#code-struktur) 7. [Weitere wichtige Themen](#weitere-wichtige-themen) --- ## Übersicht Das PowerOn AI-System ist ein mehrschichtiges System zur Verarbeitung von Dokumenten und Generierung von Inhalten mit integrierten Validierungsmechanismen. Das System verwendet einen dynamischen Workflow-Ansatz mit standardisierten Datenmodellen und mehrstufigen Validatoren. ### Kernkomponenten - **Workflow Processor**: Orchestriert die Ausführung von Tasks und Actions - **AI Service**: Zentrale Schnittstelle für alle AI-Operationen - **Extraction Service**: Extrahiert Inhalte aus Dokumenten - **Generation Service**: Generiert und rendert Dokumente - **Content Validator**: Validiert generierte Inhalte gegen User-Intent - **Workflow Validator**: Validiert Workflow-Strukturen --- ## Dynamischer Workflow ### Workflow-Architektur Der dynamische Workflow basiert auf einem **Round → Task → Action** Hierarchiemodell: ``` User Request ↓ Round (jede User-Eingabe = neuer Round) ↓ Task (mehrere Tasks pro Round möglich) ↓ Action (mehrere Actions pro Task möglich) ``` ### Standardisierte Elemente #### 1. ChatWorkflow (State Management) Das `ChatWorkflow` Objekt verwaltet den gesamten Ausführungszustand: ```python class ChatWorkflow(BaseModel): # Execution state currentRound: int = 0 # Aktueller Round currentTask: int = 0 # Aktueller Task innerhalb des Rounds currentAction: int = 0 # Aktuelle Action innerhalb des Tasks def incrementRound(self): """Increment round when new user input received""" self.currentRound += 1 self.currentTask = 0 self.currentAction = 0 def incrementTask(self): """Increment task when starting new task""" self.currentTask += 1 self.currentAction = 0 def incrementAction(self): """Increment action when executing new action""" self.currentAction += 1 ``` **Wichtig**: Alle Ausführungszustände werden im `ChatWorkflow` Objekt verwaltet, nicht als separate Parameter durch die Call-Chain übergeben. #### 2. TaskContext (Kontext-Management) Der `TaskContext` enthält alle Informationen für die Task-Ausführung: ```python class TaskContext(BaseModel): # Stage 1 context fields actionObjective: Optional[str] = None parametersContext: Optional[str] = None learnings: List[str] = Field(default_factory=list) stage1Selection: Optional[Dict[str, Any]] = None def updateFromSelection(self, selection: ActionDefinition): """Update context from Stage 1 selection""" self.actionObjective = selection.actionObjective self.parametersContext = selection.parametersContext self.learnings = selection.learnings self.stage1Selection = selection.model_dump() ``` #### 3. ActionDefinition (Action-Selektion und Parameter) Die `ActionDefinition` kombiniert Action-Selektion und Parameter: ```python class ActionDefinition(BaseModel): # Core action selection (Stage 1) action: str = Field(description="Compound action name (method.action)") actionObjective: str = Field(description="Objective for this action") parametersContext: Optional[str] = Field(None, description="Context for parameter generation") learnings: List[str] = Field(default_factory=list, description="Learnings from previous actions") # Resources (ALWAYS defined in Stage 1 if action needs them) documentList: Optional[DocumentReferenceList] = Field(None, description="Document references") connectionReference: Optional[str] = Field(None, description="Connection reference") # Parameters (may be defined in Stage 1 OR Stage 2) parameters: Optional[Dict[str, Any]] = Field(None, description="Action-specific parameters") def needsStage2(self) -> bool: """Determine if Stage 2 parameter generation is needed""" return self.parameters is None ``` ### Workflow-Ablauf #### High-Level Flow ``` User Request (prompt + documents) ↓ Request Reception → RequestContext ↓ Complexity Detection → "simple" | "moderate" | "complex" ↓ [If simple] Fast Path ├─> fastPathExecute() → ActionResult └─> Return to user (5-15s) ↓ [If complex] Full Workflow ├─> initialUnderstanding() → UnderstandingResult ├─> Create TaskDefinition[] from UnderstandingResult ├─> For each task: │ ├─> executeTask(TaskDefinition) → TaskResult │ │ ├─> Web Research (if needed) │ │ ├─> Document Extraction (separate action) │ │ ├─> Information Analysis (AI with extracted content) │ │ ├─> Content Generation (AI with extracted content) │ │ └─> Format Rendering (unified JSON → format) │ └─> persistTaskResult() → ChatMessage └─> Return to user (30-120s) ``` #### Action-Level Flow (Innerhalb eines Tasks) ``` modeDynamic.executeTask(taskStep, workflow, context) ↓ _planSelect(context) → ActionDefinition ├─> aiService.callAiPlanning() → AiResponse └─> parseJsonWithModel() → ActionDefinition ↓ _actExecute(context, selection, ...) → ActionResult ├─> context.updateFromSelection(selection) ├─> **If selection.needsStage2()** (Stage 2 needed): │ ├─> aiService.callAiPlanning() → AiResponse │ └─> parseJsonWithModel() → Update selection with parameters └─> actionExecutor.executeSingleAction() → ActionResult └─> methodAi.process(parameters) → ActionResult └─> aiService.callAiContent(prompt, contentParts=...) → AiResponse ↓ _observeBuild() → Observation ↓ _refineDecide() → ReviewResult ``` ### Validierungsmechanismen im Workflow #### 1. Workflow Validator Validiert die Struktur von Task-Plänen und Actions: ```python class WorkflowValidator: def validateTask(self, taskPlan: Dict[str, Any]) -> bool: """Validate task plan structure""" # Prüft: # - Task-Struktur (id, objective, successCriteria) # - Dependencies (keine zyklischen Abhängigkeiten) # - Eindeutige Task-IDs def validateAction(self, actions: List[Dict[str, Any]], context) -> bool: """Validate action structure""" # Prüft: # - Action-Struktur (action, parameters, resultLabel) # - Compound action format (method.action) # - Result labels beginnen mit 'round' ``` #### 2. Parameter Validation Parameter werden gegen Action-Schemas validiert: ```python # In MethodBase._validateParameters() def _validateParameters(self, parameters: Dict[str, Any], paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]: """Validate parameters against definitions""" validated = {} for paramName, paramDef in paramDefs.items(): value = parameters.get(paramName) # Check required if paramDef.required and value is None: raise ValueError(f"Required parameter '{paramName}' is missing") # Use default if not provided if value is None and paramDef.default is not None: value = paramDef.default # Type validation if value is not None: value = self._validateType(value, paramDef.type) # Custom validation rules if paramDef.validation and value is not None: self._applyValidationRules(value, paramDef.validation) validated[paramName] = value return validated ``` --- ## Generischer AI Call Flow ### Übersicht: Extraction → Neutralization → Generation → Rendering Das System folgt einem klaren 4-Phasen-Modell: ``` 1. EXTRACTION: Dokumente → ContentParts 2. NEUTRALIZATION: ContentParts → Neutralisierte ContentParts (optional) 3. GENERATION: ContentParts + Prompt → Unified JSON 4. RENDERING: Unified JSON → Format (DOCX, PDF, XLSX, etc.) ``` ### Phase 1: Extraction #### Separater Extraction Action **Wichtig**: Extraction ist eine **separate Action**, nicht Teil des AI Calls! ```python # Extraction Action: document.extractContent async def extractContent( parameters: ExtractContentParameters # Pydantic: ExtractContentParameters ) -> ActionResult # Pydantic: ActionResult ``` **Parameters Model**: ```python class ExtractContentParameters(BaseModel): documentList: DocumentReferenceList = Field(description="Document references") extractionOptions: Optional[ExtractionOptions] = Field( None, description="Extraction options (dynamic, not hardcoded)" ) ``` **Returns**: `ActionResult` mit `ActionDocument` enthaltend `ContentExtracted` Objekte. #### Two-Phase Extraction Model ##### Phase 1: Pure Content Extraction (No AI) **Methode**: `extractionService.extractContent(documents, options)` **Charakteristika**: - ✅ **Keine Größenbeschränkungen** - Extrahiert komplette Dokumente unabhängig von Größe - ✅ **Keine AI Calls** - Reine format-spezifische Extraktion (PDF, DOCX, etc.) - ✅ **Kein Chunking** - Vollständiger Content wird als-is in ContentPart Objekte extrahiert - ✅ **Format-aware** - Verwendet passenden Extractor für jeden Dokumenttyp **Output**: `List[ContentExtracted]` wobei jedes `List[ContentPart]` enthält ##### Phase 2: AI-Based Content Extraction (Mit Intelligent Model-Aware Chunking) **Methode**: `extractionService.processDocumentsPerChunk(documents, prompt, aiObjects, options)` **Charakteristika**: - ✅ **Model-aware chunking** - Chunk-Größe wird dynamisch basierend auf ausgewähltem Model berechnet - ✅ **Keine festen Größen** - Nur Model's `contextLength` und `maxTokens` sind Limits - ✅ **Per-call model selection** - Jeder AI Call kann unterschiedliches Model verwenden (Failover) - ✅ **Dynamic chunking** - Chunking wird für jedes Model in Failover-Chain neu berechnet **Chunking-Berechnung** (pro Model): ```python modelContextTokens = model.contextLength # Model's total context window modelMaxOutputTokens = model.maxTokens # Model's max output tokens promptSize = len(prompt.encode('utf-8')) # Input prompt size in bytes # Calculate prompt tokens (approximate: 1 token ≈ 4 bytes) promptTokens = promptSize / 4 # Reserve tokens for: systemMessageTokens = 10 # System message overhead outputTokens = modelMaxOutputTokens # Output reservation messageOverheadTokens = 100 # JSON/message structure overhead totalReservedTokens = promptTokens + systemMessageTokens + messageOverheadTokens + outputTokens # Calculate remaining context available for content: remainingContextTokens = modelContextTokens - totalReservedTokens # Available for content (80% safety margin): availableContentTokens = int(remainingContextTokens * 0.8) availableContentBytes = availableContentTokens * 4 # Convert back to bytes # Chunk size (70% of available for text, 80% for images): textChunkSize = int(availableContentBytes * 0.7) imageChunkSize = int(availableContentBytes * 0.8) ``` **Pipeline Architecture**: ```python for each ContentPart: # Initialize pipeline state processedChunks = [] # Already processed chunks remainingContent = contentPart.data # Content not yet chunked/processed while remainingContent: 1. Model Selection: Select best model for remaining content 2. Calculate chunk size for CURRENT model 3. Create NEXT chunk on-demand (lazy chunking) 4. Process chunk with AI 5. Update pipeline state 6. If model fails: Select next model, recalculate chunk size ``` ### Phase 2: Neutralization (Optional) #### Neutralization Action ```python # Neutralization Action: context.neutralizeData async def neutralizeData( parameters: NeutralizeDataParameters ) -> ActionResult ``` **Zweck**: Entfernt persönliche/sensible Daten aus ContentParts vor der Generierung. **Charakteristika**: - Optional - nur wenn benötigt - Verwendet AI zur Identifikation und Neutralisierung - Erhält Struktur der ContentParts ### Phase 3: Generation #### AI Service: callAiContent() **Signature**: ```python async def callAiContent( prompt: str, # Proprietary: str contentParts: Optional[List[ContentPart]] = None, # Pydantic: List[ContentPart] options: AiCallOptions, # Pydantic: AiCallOptions (REQUIRED) outputFormat: Optional[str] = None, # Proprietary: str (for document generation) title: Optional[str] = None # Proprietary: str (for document generation) ) -> AiResponse # Pydantic: AiResponse ``` **Wichtig**: - ✅ **Nur ContentParts** - Dokumente müssen vorher extrahiert werden - ✅ **REQUIRED**: `options.operationType` muss gesetzt sein - ✅ **Keine Extraction-Logik** - ContentParts müssen vorher extrahiert werden **Operation Types**: ```python class OperationTypeEnum(str, Enum): PLAN = "plan" # Planning operations DATA_ANALYSE = "dataAnalyse" # Data analysis DATA_GENERATE = "dataGenerate" # Data generation DATA_EXTRACT = "dataExtract" # Data extraction IMAGE_ANALYSE = "imageAnalyse" # Image analysis IMAGE_GENERATE = "imageGenerate" # Image generation WEB_SEARCH = "webSearch" # Web search (returns URLs) WEB_CRAWL = "webCrawl" # Web crawl (returns content) ``` **Call Flow für Document Generation**: ``` callAiContent(prompt, contentParts, options, outputFormat, title) ↓ _preparePromptWithContentParts(prompt, contentParts) → str ↓ buildGenerationPrompt(outputFormat, prompt, title, extracted_content, ...) → str ↓ _callAiWithLooping(generation_prompt, options, ...) → str (JSON) ├─> Iteration 1: AI generates JSON ├─> Check complete_response flag ├─> Extract sections ├─> Repair broken JSON if needed └─> Loop up to 50 iterations (if incomplete) ↓ parseJsonWithModel(response, UnifiedJsonDocument) → UnifiedJsonDocument ↓ generationService.renderReport(unifiedJson, outputFormat, ...) → bytes, mimeType ↓ AiResponse(content=json.dumps(unifiedJson), metadata=..., documents=[...]) ``` #### JSON Accumulation für Iterative Calls **Problem**: AI liefert JSON-Strings, die bei großen Dokumenten abgeschnitten werden können. **Lösung**: JSON String Accumulation mit KPI-basierter Validierung. **State Management**: ```python class JsonAccumulationState(BaseModel): accumulatedJsonString: str # Raw accumulated JSON string isAccumulationMode: bool # True if we're accumulating fragments lastParsedResult: Optional[Dict[str, Any]] # Last successfully parsed result allSections: List[Dict[str, Any]] # Sections extracted so far kpis: List[Dict[str, Any]] # KPI definitions with current values ``` **Flow Logic**: ``` Phase 1: First Iteration Check 1. Receive JSON string from AI 2. Try to parse: - SUCCESS + Complete → Extract sections → DONE (no accumulation) - FAILURE or INCOMPLETE → Enter accumulation mode Phase 2: Accumulation Mode (if needed) For each iteration: 1. Receive newFragmentString 2. Concatenate with overlap handling 3. Try to parse accumulatedJsonString: - SUCCESS → Go to Phase 3 (completion) - FAILURE → Continue accumulation 4. Extract partial sections (for prompt context) 5. Build continuation context for next prompt 6. Extract KPI from AI response and validate progression 7. Keep accumulatedJsonString for next iteration Phase 3: Completion (when parsing succeeds) 1. Analyze completeness 2. Add closing elements if needed 3. Repair if corrupted 4. Extract final sections 5. DONE ``` ### Phase 4: Rendering #### Generation Service: renderReport() **Signature**: ```python def renderReport( unifiedJson: UnifiedJsonDocument, outputFormat: str, title: Optional[str] = None, ... ) -> Tuple[bytes, str] # (rendered_bytes, mimeType) ``` **Format Renderer**: - Jedes Format hat einen eigenen Renderer (DOCX, PDF, XLSX, HTML, etc.) - Renderer erhalten `UnifiedJsonDocument` mit Sections-Struktur - Renderer generieren format-spezifische Ausgabe **Unified JSON Structure**: ```json { "metadata": { "title": "Document Title", "filename": "document.docx" }, "documents": [ { "sections": [ { "id": "section_1", "content_type": "heading", "title": "Section Title", "order": 1, "elements": [ { "text": "Heading Text", "level": 1 } ] }, { "id": "section_2", "content_type": "table", "title": "Data Table", "order": 2, "elements": [ { "caption": "Table Caption", "headers": ["Column 1", "Column 2"], "rows": [ ["Value 1", "Value 2"] ] } ] } ] } ] } ``` ### Generische vs. Spezifische Komponenten #### Generische Komponenten (Breite) **Extractors**: Pro Dokumenttyp ein Extractor - `PDFExtractor` - `DOCXExtractor` - `XLSXExtractor` - `ImageExtractor` - etc. **Renderers**: Pro Format ein Renderer - `DOCXRenderer` - `PDFRenderer` - `XLSXRenderer` - `HTMLRenderer` - etc. #### Spezifische Komponenten (Tiefe) **Generators**: Pro Dokumenttyp ein Generator (optional) - `ReportGenerator` (für Reports) - `AnalysisGenerator` (für Analysen) - `DataGenerator` (für Daten-Dokumente) **Validators**: Pro Content-Type ein Validator - `TableValidator` (für Tabellen) - `ListValidator` (für Listen) - `TextValidator` (für Text) --- ## Standardisierte Datenmodelle ### Extraction Models #### ContentPart Basis-Einheit für extrahierten Content: ```python class ContentPart(BaseModel): id: str = Field(description="Unique content part identifier") parentId: Optional[str] = Field(default=None, description="Optional parent content part id") label: str = Field(description="Human readable label of the part") typeGroup: str = Field(description="Logical type group: text, table, structure, binary, ...") mimeType: str = Field(description="MIME type of the part payload") data: str = Field(default="", description="Primary data payload, often extracted text") metadata: Dict[str, Any] = Field(default_factory=dict, description="Arbitrary metadata for the part") ``` #### ContentExtracted Container für extrahierte ContentParts: ```python class ContentExtracted(BaseModel): id: str = Field(description="Extraction id or source document id") parts: List[ContentPart] = Field(default_factory=list, description="List of extracted parts") summary: Optional[Dict[str, Any]] = Field(default=None, description="Optional extraction summary") ``` #### ExtractionOptions Optionen für Dokument-Extraktion: ```python class ExtractionOptions(BaseModel): # Core extraction parameters prompt: str = Field(description="Extraction prompt for AI processing") processDocumentsIndividually: bool = Field(default=True, description="Process each document separately") # Image processing parameters imageMaxPixels: int = Field(default=1024 * 1024, ge=1, description="Maximum pixels for image processing") imageQuality: int = Field(default=85, ge=1, le=100, description="Image quality (1-100)") # Merging strategy mergeStrategy: MergeStrategy = Field(description="Strategy for merging extraction results") # Optional chunking parameters chunkAllowed: Optional[bool] = Field(default=None, description="Whether chunking is allowed") maxSize: Optional[int] = Field(default=None, description="Maximum size for processing") textChunkSize: Optional[int] = Field(default=None, description="Size for text chunks") imageChunkSize: Optional[int] = Field(default=None, description="Size for image chunks") ``` ### Generation Models #### UnifiedJsonDocument Standardisiertes JSON-Format für Dokument-Generierung: ```python class UnifiedJsonDocument(BaseModel): metadata: Optional[Dict[str, Any]] = Field(default=None, description="Document metadata") documents: List[DocumentStructure] = Field(default_factory=list, description="List of documents") statistics: Optional[Dict[str, Any]] = Field(default=None, description="Document statistics") class DocumentStructure(BaseModel): sections: List[SectionStructure] = Field(default_factory=list, description="List of sections") class SectionStructure(BaseModel): id: str = Field(description="Section identifier") content_type: str = Field(description="Content type: heading, paragraph, table, list, etc.") title: Optional[str] = Field(default=None, description="Section title") order: int = Field(description="Section order") elements: List[Dict[str, Any]] = Field(default_factory=list, description="Section elements") ``` #### AiResponse Einheitliche Response von AI Calls: ```python class AiResponse(BaseModel): content: str = Field(description="Response content (JSON string for planning, text for analysis, unified JSON for documents)") metadata: Optional[AiResponseMetadata] = Field(None, description="Response metadata") documents: Optional[List[DocumentData]] = Field(None, description="Generated documents (only for document generation operations)") def toJson(self) -> Dict[str, Any]: """Convert AI response content to JSON using enhanced stabilizing failsafe conversion methods""" ``` #### AiCallOptions Optionen für AI Calls: ```python class AiCallOptions(BaseModel): operationType: OperationTypeEnum = Field(default=OperationTypeEnum.DATA_ANALYSE, description="Type of operation") priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level") compressPrompt: bool = Field(default=True, description="Whether to compress the prompt") compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary") processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately") maxCost: Optional[float] = Field(default=None, description="Max cost budget") maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds") processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Processing mode") resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.") safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits") temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0, description="Temperature for response generation") maxParts: Optional[int] = Field(default=1000, ge=1, le=1000, description="Maximum number of continuation parts to fetch") ``` ### Validator Models #### ValidationMetadata Metadaten für Validierung (in ActionDocument): ```python # In ActionDocument validationMetadata: Optional[Dict[str, Any]] = Field( None, description="Validation metadata for content validation and refinement" ) ``` **Standard-Struktur**: ```python validationMetadata = { "actionName": "ai.process", "actionParameters": { "resultType": "docx", "columnsPerRow": 10 }, "extractionMethod": "full", "sourceDocuments": ["doc1.pdf", "doc2.pdf"], # ... weitere relevante Parameter für Validierung } ``` #### ValidationResult Ergebnis der Content-Validierung: ```python { "overallSuccess": bool, "qualityScore": float, # 0.0-1.0 "dataTypeMatch": bool, "formatMatch": bool, "documentCount": int, "criteriaMapping": [ { "index": int, "criterion": str, "met": bool, "reason": str } ], "gapAnalysis": str, "gapType": str, # "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap" "structureComparison": { "required": Dict[str, Any], "found": Dict[str, Any], "gap": Dict[str, Any] }, "improvementSuggestions": List[str], "validationDetails": [ { "documentName": str, "issues": List[str], "suggestions": List[str] } ] } ``` ### Validierung bei Iterativen Calls #### JSON Accumulation mit KPI-Validierung **Problem**: Bei großen Dokumenten kann der Validator nicht das gesamte Dokument prüfen. **Lösung**: KPI-basierte Validierung während der Generierung. **KPI-Definition**: ```python kpis: List[Dict[str, Any]] = [ { "id": "table_rows", "description": "Number of table rows", "jsonPath": "$.documents[0].sections[?(@.content_type=='table')].elements[0].rows", "targetValue": 100, "currentValue": 45 }, { "id": "list_items", "description": "Number of list items", "jsonPath": "$.documents[0].sections[?(@.content_type=='list')].elements[0].items", "targetValue": 50, "currentValue": 30 } ] ``` **KPI-Validierung während Accumulation**: ```python # In jeder Iteration: 1. Extract KPI from AI response (AI antwortet mit Prozentzahl) 2. Validate KPI progression: - If % goes DOWN → Error (e.g., no data received, started new) → Return False - If % doesn't move (increment < 1%) → Error (no progress) → Return False - If % goes UP (increment >= 1%) → Good progress → Return True 3. Store KPI in accumulation state 4. Continue accumulation if valid, else stop ``` **KPI Question für AI**: ``` === PROGRESS INDICATOR === Based on the delivered data so far, approximately what percentage (%) of the total required content has been delivered? Respond with an integer between 0-100. ⚠️ IMPORTANT: - If percentage goes DOWN in next iteration → Generation will stop (error detected) - If percentage doesn't increase by at least 1% → Generation will stop (no progress) - Only continue if percentage increases by 1% or more ``` --- ## Validierung mit KPIs ### Herausforderung: Große Dokumente **Problem**: Dokumente können sehr groß sein (mehrere MB), sodass der Validator nicht das gesamte Dokument prüfen kann. **Lösung**: Generischer Ansatz für Validierung mit KPIs. ### KPI-basierte Validierung #### 1. KPI-Definition KPIs werden während der Task-Definition erstellt: ```python class KpiDefinition(BaseModel): id: str = Field(description="KPI identifier") description: str = Field(description="Human-readable description") jsonPath: str = Field(description="JSONPath expression to extract value") targetValue: Union[int, float, str] = Field(description="Target value") currentValue: Optional[Union[int, float, str]] = Field(default=None, description="Current value") tolerance: Optional[float] = Field(default=0.0, description="Tolerance for numeric values") ``` #### 2. KPI-Extraktion während Generierung Während der JSON Accumulation werden KPIs kontinuierlich aktualisiert: ```python def extractKpisFromJson( jsonData: Dict[str, Any], kpiDefinitions: List[KpiDefinition] ) -> List[Dict[str, Any]]: """Extract current KPI values from JSON using JSONPath""" updatedKpis = [] for kpi in kpiDefinitions: # Extract value using JSONPath currentValue = jsonPath(jsonData, kpi.jsonPath) updatedKpis.append({ "id": kpi.id, "description": kpi.description, "jsonPath": kpi.jsonPath, "targetValue": kpi.targetValue, "currentValue": currentValue, "progress": calculateProgress(currentValue, kpi.targetValue) }) return updatedKpis ``` #### 3. KPI-Validierung ```python def validateKpiProgression( accumulationState: JsonAccumulationState, currentKpi: int ) -> bool: """Validate KPI progression from AI response""" lastKpi = accumulationState.lastKpi if accumulationState.lastKpi else 0 increment = currentKpi - lastKpi if increment < 0: return False # Went down - error if increment < 1: return False # No progress - error return True # Progress - good ``` #### 4. Content Validator mit KPI-Support Der Content Validator verwendet KPIs für Validierung großer Dokumente: ```python async def validateContent( self, documents: List[Any], intent: Dict[str, Any], taskStep: Optional[Any] = None, actionName: Optional[str] = None, actionParameters: Optional[Dict[str, Any]] = None, actionHistory: Optional[List[Dict[str, Any]]] = None ) -> Dict[str, Any]: """Validates delivered content against user intent using AI""" # Extract KPI definitions from taskStep or intent kpiDefinitions = extractKpiDefinitions(taskStep, intent) # For large documents: Use KPI-based validation if isLargeDocument(documents): return await self._validateWithKpis(documents, intent, kpiDefinitions, taskStep) # For small documents: Use full content validation return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters, actionHistory) ``` ### Struktur-basierte Validierung Für große Dokumente wird nur die Struktur validiert, nicht der gesamte Content: ```python def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]: """Summarize JSON document structure for validation""" summary = { "metadata": {}, "sections": [], "statistics": {} } # Extract metadata metadata = jsonData.get("metadata", {}) if metadata and isinstance(metadata, dict): summary["metadata"] = dict(metadata) # Extract sections with statistics documents = jsonData.get("documents", []) if documents: summary["statistics"]["documentCount"] = len(documents) if len(documents) > 0: doc = documents[0] docSections = doc.get("sections", []) summary["statistics"]["sectionCount"] = len(docSections) # Summarize sections (without full content) for section in docSections: sectionSummary = { "id": section.get("id"), "content_type": section.get("content_type"), "title": section.get("title"), "order": section.get("order") } # For tables: extract caption and statistics if section.get("content_type") == "table": elements = section.get("elements", []) if elements and len(elements) > 0: tableElement = elements[0] sectionSummary["caption"] = tableElement.get("caption") headers = tableElement.get("headers", []) rows = tableElement.get("rows", []) sectionSummary["columnCount"] = len(headers) sectionSummary["rowCount"] = len(rows) sectionSummary["headers"] = headers # For lists: extract item count elif section.get("content_type") == "list": elements = section.get("elements", []) if elements and len(elements) > 0: listElement = elements[0] items = listElement.get("items", []) sectionSummary["itemCount"] = len(items) summary["sections"].append(sectionSummary) return summary ``` **Vorteile**: - ✅ Validiert Struktur ohne gesamten Content zu laden - ✅ Verwendet Statistiken (Anzahl Zeilen, Items, etc.) für Validierung - ✅ Skaliert für sehr große Dokumente (GB-Größe) --- ## Code-Struktur ### Gateway-Architektur ``` gateway/ ├── modules/ │ ├── routes/ # API Routes │ ├── security/ # Security & Middlewares │ ├── shared/ # Shared Utilities │ ├── services/ # Business Logic Services │ │ ├── serviceAi.py # AI Service (callAiPlanning, callAiContent) │ │ ├── serviceDocumentExtraction.py # Extraction Service │ │ └── serviceDocumentGeneration.py # Generation Service │ ├── interfaces/ # Interface Layer │ │ └── interfaceAiObjects.py # AI Objects Interface │ ├── connectors/ # External Connectors │ │ ├── connectorAiOpenai.py │ │ ├── connectorAiAnthropic.py │ │ └── ... │ ├── datamodels/ # Pydantic Models │ │ ├── datamodelAi.py │ │ ├── datamodelExtraction.py │ │ ├── datamodelWorkflow.py │ │ └── ... │ └── workflows/ # Workflow Processing │ ├── processing/ │ │ ├── core/ │ │ │ └── validator.py # Workflow Validator │ │ └── adaptive/ │ │ └── contentValidator.py # Content Validator │ └── methods/ │ ├── methodBase.py │ ├── methodAi/ │ │ ├── methodAi.py │ │ └── actions/ │ │ ├── process.py │ │ └── ... │ └── methodContext/ │ └── actions/ │ ├── extractContent.py │ └── neutralizeData.py ``` ### Service Dependency Hierarchy ``` aiService (independent) └─> No dependencies generationService (independent) └─> No dependencies extractionService (independent) └─> No dependencies chatService (depends on aiService, generationService) ├─> Uses: aiService (for AI calls) └─> Uses: generationService (for document rendering) workflowProcessor (depends on all services) ├─> Uses: chatService ├─> Uses: aiService ├─> Uses: generationService └─> Uses: extractionService ``` ### Wichtige Dateien #### AI Service - **`gateway/modules/services/serviceAi.py`**: Zentrale AI Service Implementierung - `callAiPlanning()`: Für Planning-Operationen - `callAiContent()`: Für Content-Processing (unified) #### Extraction Service - **`gateway/modules/services/serviceDocumentExtraction.py`**: Extraction Service - `extractContent()`: Phase 1 - Pure extraction - `processDocumentsPerChunk()`: Phase 2 - AI-based extraction mit chunking #### Generation Service - **`gateway/modules/services/serviceDocumentGeneration.py`**: Generation Service - `renderReport()`: Rendert Unified JSON zu Format #### Validators - **`gateway/modules/workflows/processing/core/validator.py`**: Workflow Validator - `validateTask()`: Validiert Task-Struktur - `validateAction()`: Validiert Action-Struktur - **`gateway/modules/workflows/processing/adaptive/contentValidator.py`**: Content Validator - `validateContent()`: Validiert generierte Inhalte gegen User-Intent - `_validateWithAI()`: AI-basierte Validierung - `_summarizeJsonStructure()`: Struktur-Zusammenfassung für große Dokumente #### Data Models - **`gateway/modules/datamodels/datamodelAi.py`**: AI Models - `AiCallOptions`, `AiResponse`, `JsonAccumulationState`, etc. - **`gateway/modules/datamodels/datamodelExtraction.py`**: Extraction Models - `ContentPart`, `ContentExtracted`, `ExtractionOptions`, etc. - **`gateway/modules/datamodels/datamodelWorkflow.py`**: Workflow Models - `ChatWorkflow`, `TaskContext`, `ActionDefinition`, etc. --- ## Weitere wichtige Themen ### 1. Action Registry Pattern Jede Action-Funktion wird zusammen mit ihrem Parameter-Pydantic-Model im selben Modul definiert: ```python # In methodAi/actions/process.py class AiProcessParameters(BaseModel): """Parameters for AI processing action""" aiPrompt: str = Field(description="AI instruction prompt") contentParts: Optional[List[ContentPart]] = Field(None, description="Already-extracted content parts") resultType: str = Field(default="txt", description="Output file extension") async def process(parameters: AiProcessParameters) -> ActionResult: """Execute AI processing action""" # Implementation... ``` **Vorteile**: - ✅ Type safety: Parameter werden zur Laufzeit validiert - ✅ Co-location: Action und Parameter zusammen definiert - ✅ Extensibility: Neue Actions können einfach hinzugefügt werden ### 2. Validation Metadata Pattern Alle Actions, die `ActionDocument` zurückgeben, MÜSSEN `validationMetadata` enthalten: ```python # In MethodBase def _createValidationMetadata(self, actionName: str, **kwargs) -> Dict[str, Any]: """Create standardized validationMetadata for ActionDocument instances""" return { "actionName": actionName, **kwargs # Action-specific parameters } # In Action validationMetadata = self._createValidationMetadata( actionName="ai.process", resultType="docx", columnsPerRow=10, extractionMethod="full" ) return ActionResult( success=True, documents=[ ActionDocument( documentName="report.docx", documentData=renderedBytes, mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document", validationMetadata=validationMetadata # REQUIRED ) ] ) ``` ### 3. Document Reference System Dokumente werden über typisierte Referenzen referenziert: ```python class DocumentReference(BaseModel): """Base class for document references""" pass class DocumentListReference(DocumentReference): """Reference to a document list via message label""" messageId: Optional[str] = None label: str def to_string(self) -> str: if self.messageId: return f"docList:{self.messageId}:{self.label}" return f"docList:{self.label}" class DocumentItemReference(DocumentReference): """Reference to a specific document item""" documentId: str fileName: Optional[str] = None def to_string(self) -> str: if self.fileName: return f"docItem:{self.documentId}:{self.fileName}" return f"docItem:{self.documentId}" ``` **Verwendung**: ```python # In Action Definition documentList = DocumentReferenceList( references=[ DocumentListReference(messageId="msg_123", label="task1_results"), DocumentItemReference(documentId="doc_456", fileName="report.pdf") ] ) # Convert to string list for action parameters stringRefs = documentList.to_string_list() # ["docList:msg_123:task1_results", "docItem:doc_456:report.pdf"] ``` ### 4. Document Persistence Strategy #### Phase 1: Within Running Task - Dokumente werden direkt als `ActionResult` + `ActionDocument` übergeben - **KEINE** ChatDocuments erstellt - Dokumente sind ephemeral aber verfügbar innerhalb des Tasks #### Phase 2: Between Tasks/Rounds - Dokumente werden via `ChatMessage` + `ChatDocuments` persistiert - Erforderlich für `docList:` Referenzen - `documentsLabel` wird gesetzt für spätere Referenzierung #### Phase 3: User Delivery - Vollständiges `ChatMessage` (User-Sprache) + `ChatDocuments` - Für User-facing Workflows ### 5. Error Handling und Retry Logic #### Model Failover - Wenn ein Model fehlschlägt, wird automatisch das nächste Model aus der Failover-Liste ausgewählt - Chunk-Größe wird für das neue Model neu berechnet - Processing wird mit dem neuen Model fortgesetzt #### JSON Repair - Bei kaputten JSON-Strings wird `repairBrokenJson()` verwendet - Versucht JSON-Struktur zu reparieren - Falls Reparatur fehlschlägt, wird mit vorherigem akkumulierten String fortgesetzt ### 6. Progress Logging Progress wird auf mehreren Ebenen geloggt: ```python # In AI Service self.services.chat.progressLogStart( workflowId=workflow.id, roundIndex=workflow.getRoundIndex(), taskIndex=workflow.getTaskIndex(), actionIndex=workflow.getActionIndex(), message="Starting AI call..." ) # ... processing ... self.services.chat.progressLogFinish( workflowId=workflow.id, roundIndex=workflow.getRoundIndex(), taskIndex=workflow.getTaskIndex(), actionIndex=workflow.getActionIndex(), success=True ) ``` ### 7. Debugging und Tracing #### Debug Files - AI Prompts werden als Debug-Files gespeichert - JSON Responses werden als Debug-Files gespeichert - Debug-Type wird für Kategorisierung verwendet #### JSON Extraction Utilities - `extractJsonString()`: Extrahiert JSON aus Text mit Code-Fences - `tryParseJson()`: Safe JSON Parsing mit Error Handling - `repairBrokenJson()`: Repariert kaputte/incomplete JSON-Strings --- ## Zusammenfassung Das PowerOn AI-System verwendet: 1. **Dynamischen Workflow** mit Round → Task → Action Hierarchie 2. **Standardisierte Datenmodelle** für alle Komponenten 3. **4-Phasen-Modell**: Extraction → Neutralization → Generation → Rendering 4. **KPI-basierte Validierung** für große Dokumente 5. **Model-aware Chunking** für optimale Ressourcennutzung 6. **JSON Accumulation** für iterative Generierung 7. **Type-safe Parameter Validation** durch Pydantic Models 8. **Struktur-basierte Validierung** für große Dokumente Alle Komponenten sind generisch und erweiterbar, sodass neue Actions, Extractors, Renderers und Validators einfach hinzugefügt werden können.