wiki/appdoc/doc_ai_system_validator.md
2025-12-25 00:09:38 +01:00

41 KiB

PowerOn AI-System Dokumentation - Validatoren und Workflow-Mechanismen

Inhaltsverzeichnis

  1. Übersicht
  2. Dynamischer Workflow
  3. Generischer AI Call Flow
  4. Standardisierte Datenmodelle
  5. Validierung mit KPIs
  6. Code-Struktur
  7. Weitere wichtige Themen

Übersicht

Das PowerOn AI-System ist ein mehrschichtiges System zur Verarbeitung von Dokumenten und Generierung von Inhalten mit integrierten Validierungsmechanismen. Das System verwendet einen dynamischen Workflow-Ansatz mit standardisierten Datenmodellen und mehrstufigen Validatoren.

Kernkomponenten

  • Workflow Processor: Orchestriert die Ausführung von Tasks und Actions
  • AI Service: Zentrale Schnittstelle für alle AI-Operationen
  • Extraction Service: Extrahiert Inhalte aus Dokumenten
  • Generation Service: Generiert und rendert Dokumente
  • Content Validator: Validiert generierte Inhalte gegen User-Intent
  • Workflow Validator: Validiert Workflow-Strukturen

Dynamischer Workflow

Workflow-Architektur

Der dynamische Workflow basiert auf einem Round → Task → Action Hierarchiemodell:

User Request
  ↓
Round (jede User-Eingabe = neuer Round)
  ↓
Task (mehrere Tasks pro Round möglich)
  ↓
Action (mehrere Actions pro Task möglich)

Standardisierte Elemente

1. ChatWorkflow (State Management)

Das ChatWorkflow Objekt verwaltet den gesamten Ausführungszustand:

class ChatWorkflow(BaseModel):
    # Execution state
    currentRound: int = 0          # Aktueller Round
    currentTask: int = 0            # Aktueller Task innerhalb des Rounds
    currentAction: int = 0          # Aktuelle Action innerhalb des Tasks
    
    def incrementRound(self):
        """Increment round when new user input received"""
        self.currentRound += 1
        self.currentTask = 0
        self.currentAction = 0
    
    def incrementTask(self):
        """Increment task when starting new task"""
        self.currentTask += 1
        self.currentAction = 0
    
    def incrementAction(self):
        """Increment action when executing new action"""
        self.currentAction += 1

Wichtig: Alle Ausführungszustände werden im ChatWorkflow Objekt verwaltet, nicht als separate Parameter durch die Call-Chain übergeben.

2. TaskContext (Kontext-Management)

Der TaskContext enthält alle Informationen für die Task-Ausführung:

class TaskContext(BaseModel):
    # Stage 1 context fields
    actionObjective: Optional[str] = None
    parametersContext: Optional[str] = None
    learnings: List[str] = Field(default_factory=list)
    stage1Selection: Optional[Dict[str, Any]] = None
    
    def updateFromSelection(self, selection: ActionDefinition):
        """Update context from Stage 1 selection"""
        self.actionObjective = selection.actionObjective
        self.parametersContext = selection.parametersContext
        self.learnings = selection.learnings
        self.stage1Selection = selection.model_dump()

3. ActionDefinition (Action-Selektion und Parameter)

Die ActionDefinition kombiniert Action-Selektion und Parameter:

class ActionDefinition(BaseModel):
    # Core action selection (Stage 1)
    action: str = Field(description="Compound action name (method.action)")
    actionObjective: str = Field(description="Objective for this action")
    parametersContext: Optional[str] = Field(None, description="Context for parameter generation")
    learnings: List[str] = Field(default_factory=list, description="Learnings from previous actions")
    
    # Resources (ALWAYS defined in Stage 1 if action needs them)
    documentList: Optional[DocumentReferenceList] = Field(None, description="Document references")
    connectionReference: Optional[str] = Field(None, description="Connection reference")
    
    # Parameters (may be defined in Stage 1 OR Stage 2)
    parameters: Optional[Dict[str, Any]] = Field(None, description="Action-specific parameters")
    
    def needsStage2(self) -> bool:
        """Determine if Stage 2 parameter generation is needed"""
        return self.parameters is None

Workflow-Ablauf

High-Level Flow

User Request (prompt + documents)
  ↓
Request Reception → RequestContext
  ↓
Complexity Detection → "simple" | "moderate" | "complex"
  ↓
[If simple] Fast Path
  ├─> fastPathExecute() → ActionResult
  └─> Return to user (5-15s)
  ↓
[If complex] Full Workflow
  ├─> initialUnderstanding() → UnderstandingResult
  ├─> Create TaskDefinition[] from UnderstandingResult
  ├─> For each task:
  │   ├─> executeTask(TaskDefinition) → TaskResult
  │   │   ├─> Web Research (if needed)
  │   │   ├─> Document Extraction (separate action)
  │   │   ├─> Information Analysis (AI with extracted content)
  │   │   ├─> Content Generation (AI with extracted content)
  │   │   └─> Format Rendering (unified JSON → format)
  │   └─> persistTaskResult() → ChatMessage
  └─> Return to user (30-120s)

Action-Level Flow (Innerhalb eines Tasks)

modeDynamic.executeTask(taskStep, workflow, context)
  ↓
_planSelect(context) → ActionDefinition
  ├─> aiService.callAiPlanning() → AiResponse
  └─> parseJsonWithModel() → ActionDefinition
  ↓
_actExecute(context, selection, ...) → ActionResult
  ├─> context.updateFromSelection(selection)
  ├─> **If selection.needsStage2()** (Stage 2 needed):
  │   ├─> aiService.callAiPlanning() → AiResponse
  │   └─> parseJsonWithModel() → Update selection with parameters
  └─> actionExecutor.executeSingleAction() → ActionResult
      └─> methodAi.process(parameters) → ActionResult
          └─> aiService.callAiContent(prompt, contentParts=...) → AiResponse
  ↓
_observeBuild() → Observation
  ↓
_refineDecide() → ReviewResult

Validierungsmechanismen im Workflow

1. Workflow Validator

Validiert die Struktur von Task-Plänen und Actions:

class WorkflowValidator:
    def validateTask(self, taskPlan: Dict[str, Any]) -> bool:
        """Validate task plan structure"""
        # Prüft:
        # - Task-Struktur (id, objective, successCriteria)
        # - Dependencies (keine zyklischen Abhängigkeiten)
        # - Eindeutige Task-IDs
        
    def validateAction(self, actions: List[Dict[str, Any]], context) -> bool:
        """Validate action structure"""
        # Prüft:
        # - Action-Struktur (action, parameters, resultLabel)
        # - Compound action format (method.action)
        # - Result labels beginnen mit 'round'

2. Parameter Validation

Parameter werden gegen Action-Schemas validiert:

# In MethodBase._validateParameters()
def _validateParameters(self, parameters: Dict[str, Any], 
                       paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]:
    """Validate parameters against definitions"""
    validated = {}
    
    for paramName, paramDef in paramDefs.items():
        value = parameters.get(paramName)
        
        # Check required
        if paramDef.required and value is None:
            raise ValueError(f"Required parameter '{paramName}' is missing")
        
        # Use default if not provided
        if value is None and paramDef.default is not None:
            value = paramDef.default
        
        # Type validation
        if value is not None:
            value = self._validateType(value, paramDef.type)
        
        # Custom validation rules
        if paramDef.validation and value is not None:
            self._applyValidationRules(value, paramDef.validation)
        
        validated[paramName] = value
    
    return validated

Generischer AI Call Flow

Übersicht: Extraction → Neutralization → Generation → Rendering

Das System folgt einem klaren 4-Phasen-Modell:

1. EXTRACTION: Dokumente → ContentParts
2. NEUTRALIZATION: ContentParts → Neutralisierte ContentParts (optional)
3. GENERATION: ContentParts + Prompt → Unified JSON
4. RENDERING: Unified JSON → Format (DOCX, PDF, XLSX, etc.)

Phase 1: Extraction

Separater Extraction Action

Wichtig: Extraction ist eine separate Action, nicht Teil des AI Calls!

# Extraction Action: document.extractContent
async def extractContent(
    parameters: ExtractContentParameters  # Pydantic: ExtractContentParameters
) -> ActionResult                         # Pydantic: ActionResult

Parameters Model:

class ExtractContentParameters(BaseModel):
    documentList: DocumentReferenceList = Field(description="Document references")
    extractionOptions: Optional[ExtractionOptions] = Field(
        None,
        description="Extraction options (dynamic, not hardcoded)"
    )

Returns: ActionResult mit ActionDocument enthaltend ContentExtracted Objekte.

Two-Phase Extraction Model

Phase 1: Pure Content Extraction (No AI)

Methode: extractionService.extractContent(documents, options)

Charakteristika:

  • Keine Größenbeschränkungen - Extrahiert komplette Dokumente unabhängig von Größe
  • Keine AI Calls - Reine format-spezifische Extraktion (PDF, DOCX, etc.)
  • Kein Chunking - Vollständiger Content wird als-is in ContentPart Objekte extrahiert
  • Format-aware - Verwendet passenden Extractor für jeden Dokumenttyp

Output: List[ContentExtracted] wobei jedes List[ContentPart] enthält

Phase 2: AI-Based Content Extraction (Mit Intelligent Model-Aware Chunking)

Methode: extractionService.processDocumentsPerChunk(documents, prompt, aiObjects, options)

Charakteristika:

  • Model-aware chunking - Chunk-Größe wird dynamisch basierend auf ausgewähltem Model berechnet
  • Keine festen Größen - Nur Model's contextLength und maxTokens sind Limits
  • Per-call model selection - Jeder AI Call kann unterschiedliches Model verwenden (Failover)
  • Dynamic chunking - Chunking wird für jedes Model in Failover-Chain neu berechnet

Chunking-Berechnung (pro Model):

modelContextTokens = model.contextLength      # Model's total context window
modelMaxOutputTokens = model.maxTokens        # Model's max output tokens
promptSize = len(prompt.encode('utf-8'))     # Input prompt size in bytes

# Calculate prompt tokens (approximate: 1 token ≈ 4 bytes)
promptTokens = promptSize / 4

# Reserve tokens for:
systemMessageTokens = 10                      # System message overhead
outputTokens = modelMaxOutputTokens           # Output reservation
messageOverheadTokens = 100                   # JSON/message structure overhead

totalReservedTokens = promptTokens + systemMessageTokens + 
                     messageOverheadTokens + outputTokens

# Calculate remaining context available for content:
remainingContextTokens = modelContextTokens - totalReservedTokens

# Available for content (80% safety margin):
availableContentTokens = int(remainingContextTokens * 0.8)
availableContentBytes = availableContentTokens * 4  # Convert back to bytes

# Chunk size (70% of available for text, 80% for images):
textChunkSize = int(availableContentBytes * 0.7)
imageChunkSize = int(availableContentBytes * 0.8)

Pipeline Architecture:

for each ContentPart:
    # Initialize pipeline state
    processedChunks = []  # Already processed chunks
    remainingContent = contentPart.data  # Content not yet chunked/processed
    
    while remainingContent:
        1. Model Selection: Select best model for remaining content
        2. Calculate chunk size for CURRENT model
        3. Create NEXT chunk on-demand (lazy chunking)
        4. Process chunk with AI
        5. Update pipeline state
        6. If model fails: Select next model, recalculate chunk size

Phase 2: Neutralization (Optional)

Neutralization Action

# Neutralization Action: context.neutralizeData
async def neutralizeData(
    parameters: NeutralizeDataParameters
) -> ActionResult

Zweck: Entfernt persönliche/sensible Daten aus ContentParts vor der Generierung.

Charakteristika:

  • Optional - nur wenn benötigt
  • Verwendet AI zur Identifikation und Neutralisierung
  • Erhält Struktur der ContentParts

Phase 3: Generation

AI Service: callAiContent()

Signature:

async def callAiContent(
    prompt: str,                          # Proprietary: str
    contentParts: Optional[List[ContentPart]] = None,  # Pydantic: List[ContentPart]
    options: AiCallOptions,                              # Pydantic: AiCallOptions (REQUIRED)
    outputFormat: Optional[str] = None,                 # Proprietary: str (for document generation)
    title: Optional[str] = None                          # Proprietary: str (for document generation)
) -> AiResponse                             # Pydantic: AiResponse

Wichtig:

  • Nur ContentParts - Dokumente müssen vorher extrahiert werden
  • REQUIRED: options.operationType muss gesetzt sein
  • Keine Extraction-Logik - ContentParts müssen vorher extrahiert werden

Operation Types:

class OperationTypeEnum(str, Enum):
    PLAN = "plan"                          # Planning operations
    DATA_ANALYSE = "dataAnalyse"          # Data analysis
    DATA_GENERATE = "dataGenerate"        # Data generation
    DATA_EXTRACT = "dataExtract"          # Data extraction
    IMAGE_ANALYSE = "imageAnalyse"        # Image analysis
    IMAGE_GENERATE = "imageGenerate"      # Image generation
    WEB_SEARCH = "webSearch"              # Web search (returns URLs)
    WEB_CRAWL = "webCrawl"                # Web crawl (returns content)

Call Flow für Document Generation:

callAiContent(prompt, contentParts, options, outputFormat, title)
  ↓
_preparePromptWithContentParts(prompt, contentParts) → str
  ↓
buildGenerationPrompt(outputFormat, prompt, title, extracted_content, ...) → str
  ↓
_callAiWithLooping(generation_prompt, options, ...) → str (JSON)
  ├─> Iteration 1: AI generates JSON
  ├─> Check complete_response flag
  ├─> Extract sections
  ├─> Repair broken JSON if needed
  └─> Loop up to 50 iterations (if incomplete)
  ↓
parseJsonWithModel(response, UnifiedJsonDocument) → UnifiedJsonDocument
  ↓
generationService.renderReport(unifiedJson, outputFormat, ...) → bytes, mimeType
  ↓
AiResponse(content=json.dumps(unifiedJson), metadata=..., documents=[...])

JSON Accumulation für Iterative Calls

Problem: AI liefert JSON-Strings, die bei großen Dokumenten abgeschnitten werden können.

Lösung: JSON String Accumulation mit KPI-basierter Validierung.

State Management:

class JsonAccumulationState(BaseModel):
    accumulatedJsonString: str  # Raw accumulated JSON string
    isAccumulationMode: bool   # True if we're accumulating fragments
    lastParsedResult: Optional[Dict[str, Any]]  # Last successfully parsed result
    allSections: List[Dict[str, Any]]  # Sections extracted so far
    kpis: List[Dict[str, Any]]  # KPI definitions with current values

Flow Logic:

Phase 1: First Iteration Check
  1. Receive JSON string from AI
  2. Try to parse:
     - SUCCESS + Complete → Extract sections → DONE (no accumulation)
     - FAILURE or INCOMPLETE → Enter accumulation mode

Phase 2: Accumulation Mode (if needed)
  For each iteration:
    1. Receive newFragmentString
    2. Concatenate with overlap handling
    3. Try to parse accumulatedJsonString:
       - SUCCESS → Go to Phase 3 (completion)
       - FAILURE → Continue accumulation
    4. Extract partial sections (for prompt context)
    5. Build continuation context for next prompt
    6. Extract KPI from AI response and validate progression
    7. Keep accumulatedJsonString for next iteration

Phase 3: Completion (when parsing succeeds)
  1. Analyze completeness
  2. Add closing elements if needed
  3. Repair if corrupted
  4. Extract final sections
  5. DONE

Phase 4: Rendering

Generation Service: renderReport()

Signature:

def renderReport(
    unifiedJson: UnifiedJsonDocument,
    outputFormat: str,
    title: Optional[str] = None,
    ...
) -> Tuple[bytes, str]  # (rendered_bytes, mimeType)

Format Renderer:

  • Jedes Format hat einen eigenen Renderer (DOCX, PDF, XLSX, HTML, etc.)
  • Renderer erhalten UnifiedJsonDocument mit Sections-Struktur
  • Renderer generieren format-spezifische Ausgabe

Unified JSON Structure:

{
  "metadata": {
    "title": "Document Title",
    "filename": "document.docx"
  },
  "documents": [
    {
      "sections": [
        {
          "id": "section_1",
          "content_type": "heading",
          "title": "Section Title",
          "order": 1,
          "elements": [
            {
              "text": "Heading Text",
              "level": 1
            }
          ]
        },
        {
          "id": "section_2",
          "content_type": "table",
          "title": "Data Table",
          "order": 2,
          "elements": [
            {
              "caption": "Table Caption",
              "headers": ["Column 1", "Column 2"],
              "rows": [
                ["Value 1", "Value 2"]
              ]
            }
          ]
        }
      ]
    }
  ]
}

Generische vs. Spezifische Komponenten

Generische Komponenten (Breite)

Extractors: Pro Dokumenttyp ein Extractor

  • PDFExtractor
  • DOCXExtractor
  • XLSXExtractor
  • ImageExtractor
  • etc.

Renderers: Pro Format ein Renderer

  • DOCXRenderer
  • PDFRenderer
  • XLSXRenderer
  • HTMLRenderer
  • etc.

Spezifische Komponenten (Tiefe)

Generators: Pro Dokumenttyp ein Generator (optional)

  • ReportGenerator (für Reports)
  • AnalysisGenerator (für Analysen)
  • DataGenerator (für Daten-Dokumente)

Validators: Pro Content-Type ein Validator

  • TableValidator (für Tabellen)
  • ListValidator (für Listen)
  • TextValidator (für Text)

Standardisierte Datenmodelle

Extraction Models

ContentPart

Basis-Einheit für extrahierten Content:

class ContentPart(BaseModel):
    id: str = Field(description="Unique content part identifier")
    parentId: Optional[str] = Field(default=None, description="Optional parent content part id")
    label: str = Field(description="Human readable label of the part")
    typeGroup: str = Field(description="Logical type group: text, table, structure, binary, ...")
    mimeType: str = Field(description="MIME type of the part payload")
    data: str = Field(default="", description="Primary data payload, often extracted text")
    metadata: Dict[str, Any] = Field(default_factory=dict, description="Arbitrary metadata for the part")

ContentExtracted

Container für extrahierte ContentParts:

class ContentExtracted(BaseModel):
    id: str = Field(description="Extraction id or source document id")
    parts: List[ContentPart] = Field(default_factory=list, description="List of extracted parts")
    summary: Optional[Dict[str, Any]] = Field(default=None, description="Optional extraction summary")

ExtractionOptions

Optionen für Dokument-Extraktion:

class ExtractionOptions(BaseModel):
    # Core extraction parameters
    prompt: str = Field(description="Extraction prompt for AI processing")
    processDocumentsIndividually: bool = Field(default=True, description="Process each document separately")
    
    # Image processing parameters
    imageMaxPixels: int = Field(default=1024 * 1024, ge=1, description="Maximum pixels for image processing")
    imageQuality: int = Field(default=85, ge=1, le=100, description="Image quality (1-100)")
    
    # Merging strategy
    mergeStrategy: MergeStrategy = Field(description="Strategy for merging extraction results")
    
    # Optional chunking parameters
    chunkAllowed: Optional[bool] = Field(default=None, description="Whether chunking is allowed")
    maxSize: Optional[int] = Field(default=None, description="Maximum size for processing")
    textChunkSize: Optional[int] = Field(default=None, description="Size for text chunks")
    imageChunkSize: Optional[int] = Field(default=None, description="Size for image chunks")

Generation Models

UnifiedJsonDocument

Standardisiertes JSON-Format für Dokument-Generierung:

class UnifiedJsonDocument(BaseModel):
    metadata: Optional[Dict[str, Any]] = Field(default=None, description="Document metadata")
    documents: List[DocumentStructure] = Field(default_factory=list, description="List of documents")
    statistics: Optional[Dict[str, Any]] = Field(default=None, description="Document statistics")

class DocumentStructure(BaseModel):
    sections: List[SectionStructure] = Field(default_factory=list, description="List of sections")

class SectionStructure(BaseModel):
    id: str = Field(description="Section identifier")
    content_type: str = Field(description="Content type: heading, paragraph, table, list, etc.")
    title: Optional[str] = Field(default=None, description="Section title")
    order: int = Field(description="Section order")
    elements: List[Dict[str, Any]] = Field(default_factory=list, description="Section elements")

AiResponse

Einheitliche Response von AI Calls:

class AiResponse(BaseModel):
    content: str = Field(description="Response content (JSON string for planning, text for analysis, unified JSON for documents)")
    metadata: Optional[AiResponseMetadata] = Field(None, description="Response metadata")
    documents: Optional[List[DocumentData]] = Field(None, description="Generated documents (only for document generation operations)")
    
    def toJson(self) -> Dict[str, Any]:
        """Convert AI response content to JSON using enhanced stabilizing failsafe conversion methods"""

AiCallOptions

Optionen für AI Calls:

class AiCallOptions(BaseModel):
    operationType: OperationTypeEnum = Field(default=OperationTypeEnum.DATA_ANALYSE, description="Type of operation")
    priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level")
    compressPrompt: bool = Field(default=True, description="Whether to compress the prompt")
    compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary")
    processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately")
    maxCost: Optional[float] = Field(default=None, description="Max cost budget")
    maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds")
    processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Processing mode")
    resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.")
    safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits")
    temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0, description="Temperature for response generation")
    maxParts: Optional[int] = Field(default=1000, ge=1, le=1000, description="Maximum number of continuation parts to fetch")

Validator Models

ValidationMetadata

Metadaten für Validierung (in ActionDocument):

# In ActionDocument
validationMetadata: Optional[Dict[str, Any]] = Field(
    None,
    description="Validation metadata for content validation and refinement"
)

Standard-Struktur:

validationMetadata = {
    "actionName": "ai.process",
    "actionParameters": {
        "resultType": "docx",
        "columnsPerRow": 10
    },
    "extractionMethod": "full",
    "sourceDocuments": ["doc1.pdf", "doc2.pdf"],
    # ... weitere relevante Parameter für Validierung
}

ValidationResult

Ergebnis der Content-Validierung:

{
    "overallSuccess": bool,
    "qualityScore": float,  # 0.0-1.0
    "dataTypeMatch": bool,
    "formatMatch": bool,
    "documentCount": int,
    "criteriaMapping": [
        {
            "index": int,
            "criterion": str,
            "met": bool,
            "reason": str
        }
    ],
    "gapAnalysis": str,
    "gapType": str,  # "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap"
    "structureComparison": {
        "required": Dict[str, Any],
        "found": Dict[str, Any],
        "gap": Dict[str, Any]
    },
    "improvementSuggestions": List[str],
    "validationDetails": [
        {
            "documentName": str,
            "issues": List[str],
            "suggestions": List[str]
        }
    ]
}

Validierung bei Iterativen Calls

JSON Accumulation mit KPI-Validierung

Problem: Bei großen Dokumenten kann der Validator nicht das gesamte Dokument prüfen.

Lösung: KPI-basierte Validierung während der Generierung.

KPI-Definition:

kpis: List[Dict[str, Any]] = [
    {
        "id": "table_rows",
        "description": "Number of table rows",
        "jsonPath": "$.documents[0].sections[?(@.content_type=='table')].elements[0].rows",
        "targetValue": 100,
        "currentValue": 45
    },
    {
        "id": "list_items",
        "description": "Number of list items",
        "jsonPath": "$.documents[0].sections[?(@.content_type=='list')].elements[0].items",
        "targetValue": 50,
        "currentValue": 30
    }
]

KPI-Validierung während Accumulation:

# In jeder Iteration:
1. Extract KPI from AI response (AI antwortet mit Prozentzahl)
2. Validate KPI progression:
   - If % goes DOWN  Error (e.g., no data received, started new)  Return False
   - If % doesn't move (increment < 1%) → Error (no progress) → Return False
   - If % goes UP (increment >= 1%)  Good progress  Return True
3. Store KPI in accumulation state
4. Continue accumulation if valid, else stop

KPI Question für AI:

=== PROGRESS INDICATOR ===
Based on the delivered data so far, approximately what percentage (%) of the total 
required content has been delivered? 

Respond with an integer between 0-100.

⚠️ IMPORTANT: 
- If percentage goes DOWN in next iteration → Generation will stop (error detected)
- If percentage doesn't increase by at least 1% → Generation will stop (no progress)
- Only continue if percentage increases by 1% or more

Validierung mit KPIs

Herausforderung: Große Dokumente

Problem: Dokumente können sehr groß sein (mehrere MB), sodass der Validator nicht das gesamte Dokument prüfen kann.

Lösung: Generischer Ansatz für Validierung mit KPIs.

KPI-basierte Validierung

1. KPI-Definition

KPIs werden während der Task-Definition erstellt:

class KpiDefinition(BaseModel):
    id: str = Field(description="KPI identifier")
    description: str = Field(description="Human-readable description")
    jsonPath: str = Field(description="JSONPath expression to extract value")
    targetValue: Union[int, float, str] = Field(description="Target value")
    currentValue: Optional[Union[int, float, str]] = Field(default=None, description="Current value")
    tolerance: Optional[float] = Field(default=0.0, description="Tolerance for numeric values")

2. KPI-Extraktion während Generierung

Während der JSON Accumulation werden KPIs kontinuierlich aktualisiert:

def extractKpisFromJson(
    jsonData: Dict[str, Any],
    kpiDefinitions: List[KpiDefinition]
) -> List[Dict[str, Any]]:
    """Extract current KPI values from JSON using JSONPath"""
    updatedKpis = []
    
    for kpi in kpiDefinitions:
        # Extract value using JSONPath
        currentValue = jsonPath(jsonData, kpi.jsonPath)
        
        updatedKpis.append({
            "id": kpi.id,
            "description": kpi.description,
            "jsonPath": kpi.jsonPath,
            "targetValue": kpi.targetValue,
            "currentValue": currentValue,
            "progress": calculateProgress(currentValue, kpi.targetValue)
        })
    
    return updatedKpis

3. KPI-Validierung

def validateKpiProgression(
    accumulationState: JsonAccumulationState,
    currentKpi: int
) -> bool:
    """Validate KPI progression from AI response"""
    lastKpi = accumulationState.lastKpi if accumulationState.lastKpi else 0
    increment = currentKpi - lastKpi
    
    if increment < 0:
        return False  # Went down - error
    if increment < 1:
        return False  # No progress - error
    return True  # Progress - good

4. Content Validator mit KPI-Support

Der Content Validator verwendet KPIs für Validierung großer Dokumente:

async def validateContent(
    self,
    documents: List[Any],
    intent: Dict[str, Any],
    taskStep: Optional[Any] = None,
    actionName: Optional[str] = None,
    actionParameters: Optional[Dict[str, Any]] = None,
    actionHistory: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
    """Validates delivered content against user intent using AI"""
    
    # Extract KPI definitions from taskStep or intent
    kpiDefinitions = extractKpiDefinitions(taskStep, intent)
    
    # For large documents: Use KPI-based validation
    if isLargeDocument(documents):
        return await self._validateWithKpis(documents, intent, kpiDefinitions, taskStep)
    
    # For small documents: Use full content validation
    return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters, actionHistory)

Struktur-basierte Validierung

Für große Dokumente wird nur die Struktur validiert, nicht der gesamte Content:

def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]:
    """Summarize JSON document structure for validation"""
    summary = {
        "metadata": {},
        "sections": [],
        "statistics": {}
    }
    
    # Extract metadata
    metadata = jsonData.get("metadata", {})
    if metadata and isinstance(metadata, dict):
        summary["metadata"] = dict(metadata)
    
    # Extract sections with statistics
    documents = jsonData.get("documents", [])
    if documents:
        summary["statistics"]["documentCount"] = len(documents)
        if len(documents) > 0:
            doc = documents[0]
            docSections = doc.get("sections", [])
            summary["statistics"]["sectionCount"] = len(docSections)
            
            # Summarize sections (without full content)
            for section in docSections:
                sectionSummary = {
                    "id": section.get("id"),
                    "content_type": section.get("content_type"),
                    "title": section.get("title"),
                    "order": section.get("order")
                }
                
                # For tables: extract caption and statistics
                if section.get("content_type") == "table":
                    elements = section.get("elements", [])
                    if elements and len(elements) > 0:
                        tableElement = elements[0]
                        sectionSummary["caption"] = tableElement.get("caption")
                        headers = tableElement.get("headers", [])
                        rows = tableElement.get("rows", [])
                        sectionSummary["columnCount"] = len(headers)
                        sectionSummary["rowCount"] = len(rows)
                        sectionSummary["headers"] = headers
                
                # For lists: extract item count
                elif section.get("content_type") == "list":
                    elements = section.get("elements", [])
                    if elements and len(elements) > 0:
                        listElement = elements[0]
                        items = listElement.get("items", [])
                        sectionSummary["itemCount"] = len(items)
                
                summary["sections"].append(sectionSummary)
    
    return summary

Vorteile:

  • Validiert Struktur ohne gesamten Content zu laden
  • Verwendet Statistiken (Anzahl Zeilen, Items, etc.) für Validierung
  • Skaliert für sehr große Dokumente (GB-Größe)

Code-Struktur

Gateway-Architektur

gateway/
├── modules/
│   ├── routes/              # API Routes
│   ├── security/            # Security & Middlewares
│   ├── shared/              # Shared Utilities
│   ├── services/            # Business Logic Services
│   │   ├── serviceAi.py     # AI Service (callAiPlanning, callAiContent)
│   │   ├── serviceDocumentExtraction.py  # Extraction Service
│   │   └── serviceDocumentGeneration.py  # Generation Service
│   ├── interfaces/          # Interface Layer
│   │   └── interfaceAiObjects.py  # AI Objects Interface
│   ├── connectors/          # External Connectors
│   │   ├── connectorAiOpenai.py
│   │   ├── connectorAiAnthropic.py
│   │   └── ...
│   ├── datamodels/          # Pydantic Models
│   │   ├── datamodelAi.py
│   │   ├── datamodelExtraction.py
│   │   ├── datamodelWorkflow.py
│   │   └── ...
│   └── workflows/           # Workflow Processing
│       ├── processing/
│       │   ├── core/
│       │   │   └── validator.py  # Workflow Validator
│       │   └── adaptive/
│       │       └── contentValidator.py  # Content Validator
│       └── methods/
│           ├── methodBase.py
│           ├── methodAi/
│           │   ├── methodAi.py
│           │   └── actions/
│           │       ├── process.py
│           │       └── ...
│           └── methodContext/
│               └── actions/
│                   ├── extractContent.py
│                   └── neutralizeData.py

Service Dependency Hierarchy

aiService (independent)
  └─> No dependencies

generationService (independent)
  └─> No dependencies

extractionService (independent)
  └─> No dependencies

chatService (depends on aiService, generationService)
  ├─> Uses: aiService (for AI calls)
  └─> Uses: generationService (for document rendering)

workflowProcessor (depends on all services)
  ├─> Uses: chatService
  ├─> Uses: aiService
  ├─> Uses: generationService
  └─> Uses: extractionService

Wichtige Dateien

AI Service

  • gateway/modules/services/serviceAi.py: Zentrale AI Service Implementierung
    • callAiPlanning(): Für Planning-Operationen
    • callAiContent(): Für Content-Processing (unified)

Extraction Service

  • gateway/modules/services/serviceDocumentExtraction.py: Extraction Service
    • extractContent(): Phase 1 - Pure extraction
    • processDocumentsPerChunk(): Phase 2 - AI-based extraction mit chunking

Generation Service

  • gateway/modules/services/serviceDocumentGeneration.py: Generation Service
    • renderReport(): Rendert Unified JSON zu Format

Validators

  • gateway/modules/workflows/processing/core/validator.py: Workflow Validator

    • validateTask(): Validiert Task-Struktur
    • validateAction(): Validiert Action-Struktur
  • gateway/modules/workflows/processing/adaptive/contentValidator.py: Content Validator

    • validateContent(): Validiert generierte Inhalte gegen User-Intent
    • _validateWithAI(): AI-basierte Validierung
    • _summarizeJsonStructure(): Struktur-Zusammenfassung für große Dokumente

Data Models

  • gateway/modules/datamodels/datamodelAi.py: AI Models

    • AiCallOptions, AiResponse, JsonAccumulationState, etc.
  • gateway/modules/datamodels/datamodelExtraction.py: Extraction Models

    • ContentPart, ContentExtracted, ExtractionOptions, etc.
  • gateway/modules/datamodels/datamodelWorkflow.py: Workflow Models

    • ChatWorkflow, TaskContext, ActionDefinition, etc.

Weitere wichtige Themen

1. Action Registry Pattern

Jede Action-Funktion wird zusammen mit ihrem Parameter-Pydantic-Model im selben Modul definiert:

# In methodAi/actions/process.py
class AiProcessParameters(BaseModel):
    """Parameters for AI processing action"""
    aiPrompt: str = Field(description="AI instruction prompt")
    contentParts: Optional[List[ContentPart]] = Field(None, description="Already-extracted content parts")
    resultType: str = Field(default="txt", description="Output file extension")

async def process(parameters: AiProcessParameters) -> ActionResult:
    """Execute AI processing action"""
    # Implementation...

Vorteile:

  • Type safety: Parameter werden zur Laufzeit validiert
  • Co-location: Action und Parameter zusammen definiert
  • Extensibility: Neue Actions können einfach hinzugefügt werden

2. Validation Metadata Pattern

Alle Actions, die ActionDocument zurückgeben, MÜSSEN validationMetadata enthalten:

# In MethodBase
def _createValidationMetadata(self, actionName: str, **kwargs) -> Dict[str, Any]:
    """Create standardized validationMetadata for ActionDocument instances"""
    return {
        "actionName": actionName,
        **kwargs  # Action-specific parameters
    }

# In Action
validationMetadata = self._createValidationMetadata(
    actionName="ai.process",
    resultType="docx",
    columnsPerRow=10,
    extractionMethod="full"
)

return ActionResult(
    success=True,
    documents=[
        ActionDocument(
            documentName="report.docx",
            documentData=renderedBytes,
            mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
            validationMetadata=validationMetadata  # REQUIRED
        )
    ]
)

3. Document Reference System

Dokumente werden über typisierte Referenzen referenziert:

class DocumentReference(BaseModel):
    """Base class for document references"""
    pass

class DocumentListReference(DocumentReference):
    """Reference to a document list via message label"""
    messageId: Optional[str] = None
    label: str
    
    def to_string(self) -> str:
        if self.messageId:
            return f"docList:{self.messageId}:{self.label}"
        return f"docList:{self.label}"

class DocumentItemReference(DocumentReference):
    """Reference to a specific document item"""
    documentId: str
    fileName: Optional[str] = None
    
    def to_string(self) -> str:
        if self.fileName:
            return f"docItem:{self.documentId}:{self.fileName}"
        return f"docItem:{self.documentId}"

Verwendung:

# In Action Definition
documentList = DocumentReferenceList(
    references=[
        DocumentListReference(messageId="msg_123", label="task1_results"),
        DocumentItemReference(documentId="doc_456", fileName="report.pdf")
    ]
)

# Convert to string list for action parameters
stringRefs = documentList.to_string_list()
# ["docList:msg_123:task1_results", "docItem:doc_456:report.pdf"]

4. Document Persistence Strategy

Phase 1: Within Running Task

  • Dokumente werden direkt als ActionResult + ActionDocument übergeben
  • KEINE ChatDocuments erstellt
  • Dokumente sind ephemeral aber verfügbar innerhalb des Tasks

Phase 2: Between Tasks/Rounds

  • Dokumente werden via ChatMessage + ChatDocuments persistiert
  • Erforderlich für docList: Referenzen
  • documentsLabel wird gesetzt für spätere Referenzierung

Phase 3: User Delivery

  • Vollständiges ChatMessage (User-Sprache) + ChatDocuments
  • Für User-facing Workflows

5. Error Handling und Retry Logic

Model Failover

  • Wenn ein Model fehlschlägt, wird automatisch das nächste Model aus der Failover-Liste ausgewählt
  • Chunk-Größe wird für das neue Model neu berechnet
  • Processing wird mit dem neuen Model fortgesetzt

JSON Repair

  • Bei kaputten JSON-Strings wird repairBrokenJson() verwendet
  • Versucht JSON-Struktur zu reparieren
  • Falls Reparatur fehlschlägt, wird mit vorherigem akkumulierten String fortgesetzt

6. Progress Logging

Progress wird auf mehreren Ebenen geloggt:

# In AI Service
self.services.chat.progressLogStart(
    workflowId=workflow.id,
    roundIndex=workflow.getRoundIndex(),
    taskIndex=workflow.getTaskIndex(),
    actionIndex=workflow.getActionIndex(),
    message="Starting AI call..."
)

# ... processing ...

self.services.chat.progressLogFinish(
    workflowId=workflow.id,
    roundIndex=workflow.getRoundIndex(),
    taskIndex=workflow.getTaskIndex(),
    actionIndex=workflow.getActionIndex(),
    success=True
)

7. Debugging und Tracing

Debug Files

  • AI Prompts werden als Debug-Files gespeichert
  • JSON Responses werden als Debug-Files gespeichert
  • Debug-Type wird für Kategorisierung verwendet

JSON Extraction Utilities

  • extractJsonString(): Extrahiert JSON aus Text mit Code-Fences
  • tryParseJson(): Safe JSON Parsing mit Error Handling
  • repairBrokenJson(): Repariert kaputte/incomplete JSON-Strings

Zusammenfassung

Das PowerOn AI-System verwendet:

  1. Dynamischen Workflow mit Round → Task → Action Hierarchie
  2. Standardisierte Datenmodelle für alle Komponenten
  3. 4-Phasen-Modell: Extraction → Neutralization → Generation → Rendering
  4. KPI-basierte Validierung für große Dokumente
  5. Model-aware Chunking für optimale Ressourcennutzung
  6. JSON Accumulation für iterative Generierung
  7. Type-safe Parameter Validation durch Pydantic Models
  8. Struktur-basierte Validierung für große Dokumente

Alle Komponenten sind generisch und erweiterbar, sodass neue Actions, Extractors, Renderers und Validators einfach hinzugefügt werden können.