41 KiB
PowerOn AI-System Dokumentation - Validatoren und Workflow-Mechanismen
Inhaltsverzeichnis
- Übersicht
- Dynamischer Workflow
- Generischer AI Call Flow
- Standardisierte Datenmodelle
- Validierung mit KPIs
- Code-Struktur
- Weitere wichtige Themen
Übersicht
Das PowerOn AI-System ist ein mehrschichtiges System zur Verarbeitung von Dokumenten und Generierung von Inhalten mit integrierten Validierungsmechanismen. Das System verwendet einen dynamischen Workflow-Ansatz mit standardisierten Datenmodellen und mehrstufigen Validatoren.
Kernkomponenten
- Workflow Processor: Orchestriert die Ausführung von Tasks und Actions
- AI Service: Zentrale Schnittstelle für alle AI-Operationen
- Extraction Service: Extrahiert Inhalte aus Dokumenten
- Generation Service: Generiert und rendert Dokumente
- Content Validator: Validiert generierte Inhalte gegen User-Intent
- Workflow Validator: Validiert Workflow-Strukturen
Dynamischer Workflow
Workflow-Architektur
Der dynamische Workflow basiert auf einem Round → Task → Action Hierarchiemodell:
User Request
↓
Round (jede User-Eingabe = neuer Round)
↓
Task (mehrere Tasks pro Round möglich)
↓
Action (mehrere Actions pro Task möglich)
Standardisierte Elemente
1. ChatWorkflow (State Management)
Das ChatWorkflow Objekt verwaltet den gesamten Ausführungszustand:
class ChatWorkflow(BaseModel):
# Execution state
currentRound: int = 0 # Aktueller Round
currentTask: int = 0 # Aktueller Task innerhalb des Rounds
currentAction: int = 0 # Aktuelle Action innerhalb des Tasks
def incrementRound(self):
"""Increment round when new user input received"""
self.currentRound += 1
self.currentTask = 0
self.currentAction = 0
def incrementTask(self):
"""Increment task when starting new task"""
self.currentTask += 1
self.currentAction = 0
def incrementAction(self):
"""Increment action when executing new action"""
self.currentAction += 1
Wichtig: Alle Ausführungszustände werden im ChatWorkflow Objekt verwaltet, nicht als separate Parameter durch die Call-Chain übergeben.
2. TaskContext (Kontext-Management)
Der TaskContext enthält alle Informationen für die Task-Ausführung:
class TaskContext(BaseModel):
# Stage 1 context fields
actionObjective: Optional[str] = None
parametersContext: Optional[str] = None
learnings: List[str] = Field(default_factory=list)
stage1Selection: Optional[Dict[str, Any]] = None
def updateFromSelection(self, selection: ActionDefinition):
"""Update context from Stage 1 selection"""
self.actionObjective = selection.actionObjective
self.parametersContext = selection.parametersContext
self.learnings = selection.learnings
self.stage1Selection = selection.model_dump()
3. ActionDefinition (Action-Selektion und Parameter)
Die ActionDefinition kombiniert Action-Selektion und Parameter:
class ActionDefinition(BaseModel):
# Core action selection (Stage 1)
action: str = Field(description="Compound action name (method.action)")
actionObjective: str = Field(description="Objective for this action")
parametersContext: Optional[str] = Field(None, description="Context for parameter generation")
learnings: List[str] = Field(default_factory=list, description="Learnings from previous actions")
# Resources (ALWAYS defined in Stage 1 if action needs them)
documentList: Optional[DocumentReferenceList] = Field(None, description="Document references")
connectionReference: Optional[str] = Field(None, description="Connection reference")
# Parameters (may be defined in Stage 1 OR Stage 2)
parameters: Optional[Dict[str, Any]] = Field(None, description="Action-specific parameters")
def needsStage2(self) -> bool:
"""Determine if Stage 2 parameter generation is needed"""
return self.parameters is None
Workflow-Ablauf
High-Level Flow
User Request (prompt + documents)
↓
Request Reception → RequestContext
↓
Complexity Detection → "simple" | "moderate" | "complex"
↓
[If simple] Fast Path
├─> fastPathExecute() → ActionResult
└─> Return to user (5-15s)
↓
[If complex] Full Workflow
├─> initialUnderstanding() → UnderstandingResult
├─> Create TaskDefinition[] from UnderstandingResult
├─> For each task:
│ ├─> executeTask(TaskDefinition) → TaskResult
│ │ ├─> Web Research (if needed)
│ │ ├─> Document Extraction (separate action)
│ │ ├─> Information Analysis (AI with extracted content)
│ │ ├─> Content Generation (AI with extracted content)
│ │ └─> Format Rendering (unified JSON → format)
│ └─> persistTaskResult() → ChatMessage
└─> Return to user (30-120s)
Action-Level Flow (Innerhalb eines Tasks)
modeDynamic.executeTask(taskStep, workflow, context)
↓
_planSelect(context) → ActionDefinition
├─> aiService.callAiPlanning() → AiResponse
└─> parseJsonWithModel() → ActionDefinition
↓
_actExecute(context, selection, ...) → ActionResult
├─> context.updateFromSelection(selection)
├─> **If selection.needsStage2()** (Stage 2 needed):
│ ├─> aiService.callAiPlanning() → AiResponse
│ └─> parseJsonWithModel() → Update selection with parameters
└─> actionExecutor.executeSingleAction() → ActionResult
└─> methodAi.process(parameters) → ActionResult
└─> aiService.callAiContent(prompt, contentParts=...) → AiResponse
↓
_observeBuild() → Observation
↓
_refineDecide() → ReviewResult
Validierungsmechanismen im Workflow
1. Workflow Validator
Validiert die Struktur von Task-Plänen und Actions:
class WorkflowValidator:
def validateTask(self, taskPlan: Dict[str, Any]) -> bool:
"""Validate task plan structure"""
# Prüft:
# - Task-Struktur (id, objective, successCriteria)
# - Dependencies (keine zyklischen Abhängigkeiten)
# - Eindeutige Task-IDs
def validateAction(self, actions: List[Dict[str, Any]], context) -> bool:
"""Validate action structure"""
# Prüft:
# - Action-Struktur (action, parameters, resultLabel)
# - Compound action format (method.action)
# - Result labels beginnen mit 'round'
2. Parameter Validation
Parameter werden gegen Action-Schemas validiert:
# In MethodBase._validateParameters()
def _validateParameters(self, parameters: Dict[str, Any],
paramDefs: Dict[str, WorkflowActionParameter]) -> Dict[str, Any]:
"""Validate parameters against definitions"""
validated = {}
for paramName, paramDef in paramDefs.items():
value = parameters.get(paramName)
# Check required
if paramDef.required and value is None:
raise ValueError(f"Required parameter '{paramName}' is missing")
# Use default if not provided
if value is None and paramDef.default is not None:
value = paramDef.default
# Type validation
if value is not None:
value = self._validateType(value, paramDef.type)
# Custom validation rules
if paramDef.validation and value is not None:
self._applyValidationRules(value, paramDef.validation)
validated[paramName] = value
return validated
Generischer AI Call Flow
Übersicht: Extraction → Neutralization → Generation → Rendering
Das System folgt einem klaren 4-Phasen-Modell:
1. EXTRACTION: Dokumente → ContentParts
2. NEUTRALIZATION: ContentParts → Neutralisierte ContentParts (optional)
3. GENERATION: ContentParts + Prompt → Unified JSON
4. RENDERING: Unified JSON → Format (DOCX, PDF, XLSX, etc.)
Phase 1: Extraction
Separater Extraction Action
Wichtig: Extraction ist eine separate Action, nicht Teil des AI Calls!
# Extraction Action: document.extractContent
async def extractContent(
parameters: ExtractContentParameters # Pydantic: ExtractContentParameters
) -> ActionResult # Pydantic: ActionResult
Parameters Model:
class ExtractContentParameters(BaseModel):
documentList: DocumentReferenceList = Field(description="Document references")
extractionOptions: Optional[ExtractionOptions] = Field(
None,
description="Extraction options (dynamic, not hardcoded)"
)
Returns: ActionResult mit ActionDocument enthaltend ContentExtracted Objekte.
Two-Phase Extraction Model
Phase 1: Pure Content Extraction (No AI)
Methode: extractionService.extractContent(documents, options)
Charakteristika:
- ✅ Keine Größenbeschränkungen - Extrahiert komplette Dokumente unabhängig von Größe
- ✅ Keine AI Calls - Reine format-spezifische Extraktion (PDF, DOCX, etc.)
- ✅ Kein Chunking - Vollständiger Content wird als-is in ContentPart Objekte extrahiert
- ✅ Format-aware - Verwendet passenden Extractor für jeden Dokumenttyp
Output: List[ContentExtracted] wobei jedes List[ContentPart] enthält
Phase 2: AI-Based Content Extraction (Mit Intelligent Model-Aware Chunking)
Methode: extractionService.processDocumentsPerChunk(documents, prompt, aiObjects, options)
Charakteristika:
- ✅ Model-aware chunking - Chunk-Größe wird dynamisch basierend auf ausgewähltem Model berechnet
- ✅ Keine festen Größen - Nur Model's
contextLengthundmaxTokenssind Limits - ✅ Per-call model selection - Jeder AI Call kann unterschiedliches Model verwenden (Failover)
- ✅ Dynamic chunking - Chunking wird für jedes Model in Failover-Chain neu berechnet
Chunking-Berechnung (pro Model):
modelContextTokens = model.contextLength # Model's total context window
modelMaxOutputTokens = model.maxTokens # Model's max output tokens
promptSize = len(prompt.encode('utf-8')) # Input prompt size in bytes
# Calculate prompt tokens (approximate: 1 token ≈ 4 bytes)
promptTokens = promptSize / 4
# Reserve tokens for:
systemMessageTokens = 10 # System message overhead
outputTokens = modelMaxOutputTokens # Output reservation
messageOverheadTokens = 100 # JSON/message structure overhead
totalReservedTokens = promptTokens + systemMessageTokens +
messageOverheadTokens + outputTokens
# Calculate remaining context available for content:
remainingContextTokens = modelContextTokens - totalReservedTokens
# Available for content (80% safety margin):
availableContentTokens = int(remainingContextTokens * 0.8)
availableContentBytes = availableContentTokens * 4 # Convert back to bytes
# Chunk size (70% of available for text, 80% for images):
textChunkSize = int(availableContentBytes * 0.7)
imageChunkSize = int(availableContentBytes * 0.8)
Pipeline Architecture:
for each ContentPart:
# Initialize pipeline state
processedChunks = [] # Already processed chunks
remainingContent = contentPart.data # Content not yet chunked/processed
while remainingContent:
1. Model Selection: Select best model for remaining content
2. Calculate chunk size for CURRENT model
3. Create NEXT chunk on-demand (lazy chunking)
4. Process chunk with AI
5. Update pipeline state
6. If model fails: Select next model, recalculate chunk size
Phase 2: Neutralization (Optional)
Neutralization Action
# Neutralization Action: context.neutralizeData
async def neutralizeData(
parameters: NeutralizeDataParameters
) -> ActionResult
Zweck: Entfernt persönliche/sensible Daten aus ContentParts vor der Generierung.
Charakteristika:
- Optional - nur wenn benötigt
- Verwendet AI zur Identifikation und Neutralisierung
- Erhält Struktur der ContentParts
Phase 3: Generation
AI Service: callAiContent()
Signature:
async def callAiContent(
prompt: str, # Proprietary: str
contentParts: Optional[List[ContentPart]] = None, # Pydantic: List[ContentPart]
options: AiCallOptions, # Pydantic: AiCallOptions (REQUIRED)
outputFormat: Optional[str] = None, # Proprietary: str (for document generation)
title: Optional[str] = None # Proprietary: str (for document generation)
) -> AiResponse # Pydantic: AiResponse
Wichtig:
- ✅ Nur ContentParts - Dokumente müssen vorher extrahiert werden
- ✅ REQUIRED:
options.operationTypemuss gesetzt sein - ✅ Keine Extraction-Logik - ContentParts müssen vorher extrahiert werden
Operation Types:
class OperationTypeEnum(str, Enum):
PLAN = "plan" # Planning operations
DATA_ANALYSE = "dataAnalyse" # Data analysis
DATA_GENERATE = "dataGenerate" # Data generation
DATA_EXTRACT = "dataExtract" # Data extraction
IMAGE_ANALYSE = "imageAnalyse" # Image analysis
IMAGE_GENERATE = "imageGenerate" # Image generation
WEB_SEARCH = "webSearch" # Web search (returns URLs)
WEB_CRAWL = "webCrawl" # Web crawl (returns content)
Call Flow für Document Generation:
callAiContent(prompt, contentParts, options, outputFormat, title)
↓
_preparePromptWithContentParts(prompt, contentParts) → str
↓
buildGenerationPrompt(outputFormat, prompt, title, extracted_content, ...) → str
↓
_callAiWithLooping(generation_prompt, options, ...) → str (JSON)
├─> Iteration 1: AI generates JSON
├─> Check complete_response flag
├─> Extract sections
├─> Repair broken JSON if needed
└─> Loop up to 50 iterations (if incomplete)
↓
parseJsonWithModel(response, UnifiedJsonDocument) → UnifiedJsonDocument
↓
generationService.renderReport(unifiedJson, outputFormat, ...) → bytes, mimeType
↓
AiResponse(content=json.dumps(unifiedJson), metadata=..., documents=[...])
JSON Accumulation für Iterative Calls
Problem: AI liefert JSON-Strings, die bei großen Dokumenten abgeschnitten werden können.
Lösung: JSON String Accumulation mit KPI-basierter Validierung.
State Management:
class JsonAccumulationState(BaseModel):
accumulatedJsonString: str # Raw accumulated JSON string
isAccumulationMode: bool # True if we're accumulating fragments
lastParsedResult: Optional[Dict[str, Any]] # Last successfully parsed result
allSections: List[Dict[str, Any]] # Sections extracted so far
kpis: List[Dict[str, Any]] # KPI definitions with current values
Flow Logic:
Phase 1: First Iteration Check
1. Receive JSON string from AI
2. Try to parse:
- SUCCESS + Complete → Extract sections → DONE (no accumulation)
- FAILURE or INCOMPLETE → Enter accumulation mode
Phase 2: Accumulation Mode (if needed)
For each iteration:
1. Receive newFragmentString
2. Concatenate with overlap handling
3. Try to parse accumulatedJsonString:
- SUCCESS → Go to Phase 3 (completion)
- FAILURE → Continue accumulation
4. Extract partial sections (for prompt context)
5. Build continuation context for next prompt
6. Extract KPI from AI response and validate progression
7. Keep accumulatedJsonString for next iteration
Phase 3: Completion (when parsing succeeds)
1. Analyze completeness
2. Add closing elements if needed
3. Repair if corrupted
4. Extract final sections
5. DONE
Phase 4: Rendering
Generation Service: renderReport()
Signature:
def renderReport(
unifiedJson: UnifiedJsonDocument,
outputFormat: str,
title: Optional[str] = None,
...
) -> Tuple[bytes, str] # (rendered_bytes, mimeType)
Format Renderer:
- Jedes Format hat einen eigenen Renderer (DOCX, PDF, XLSX, HTML, etc.)
- Renderer erhalten
UnifiedJsonDocumentmit Sections-Struktur - Renderer generieren format-spezifische Ausgabe
Unified JSON Structure:
{
"metadata": {
"title": "Document Title",
"filename": "document.docx"
},
"documents": [
{
"sections": [
{
"id": "section_1",
"content_type": "heading",
"title": "Section Title",
"order": 1,
"elements": [
{
"text": "Heading Text",
"level": 1
}
]
},
{
"id": "section_2",
"content_type": "table",
"title": "Data Table",
"order": 2,
"elements": [
{
"caption": "Table Caption",
"headers": ["Column 1", "Column 2"],
"rows": [
["Value 1", "Value 2"]
]
}
]
}
]
}
]
}
Generische vs. Spezifische Komponenten
Generische Komponenten (Breite)
Extractors: Pro Dokumenttyp ein Extractor
PDFExtractorDOCXExtractorXLSXExtractorImageExtractor- etc.
Renderers: Pro Format ein Renderer
DOCXRendererPDFRendererXLSXRendererHTMLRenderer- etc.
Spezifische Komponenten (Tiefe)
Generators: Pro Dokumenttyp ein Generator (optional)
ReportGenerator(für Reports)AnalysisGenerator(für Analysen)DataGenerator(für Daten-Dokumente)
Validators: Pro Content-Type ein Validator
TableValidator(für Tabellen)ListValidator(für Listen)TextValidator(für Text)
Standardisierte Datenmodelle
Extraction Models
ContentPart
Basis-Einheit für extrahierten Content:
class ContentPart(BaseModel):
id: str = Field(description="Unique content part identifier")
parentId: Optional[str] = Field(default=None, description="Optional parent content part id")
label: str = Field(description="Human readable label of the part")
typeGroup: str = Field(description="Logical type group: text, table, structure, binary, ...")
mimeType: str = Field(description="MIME type of the part payload")
data: str = Field(default="", description="Primary data payload, often extracted text")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Arbitrary metadata for the part")
ContentExtracted
Container für extrahierte ContentParts:
class ContentExtracted(BaseModel):
id: str = Field(description="Extraction id or source document id")
parts: List[ContentPart] = Field(default_factory=list, description="List of extracted parts")
summary: Optional[Dict[str, Any]] = Field(default=None, description="Optional extraction summary")
ExtractionOptions
Optionen für Dokument-Extraktion:
class ExtractionOptions(BaseModel):
# Core extraction parameters
prompt: str = Field(description="Extraction prompt for AI processing")
processDocumentsIndividually: bool = Field(default=True, description="Process each document separately")
# Image processing parameters
imageMaxPixels: int = Field(default=1024 * 1024, ge=1, description="Maximum pixels for image processing")
imageQuality: int = Field(default=85, ge=1, le=100, description="Image quality (1-100)")
# Merging strategy
mergeStrategy: MergeStrategy = Field(description="Strategy for merging extraction results")
# Optional chunking parameters
chunkAllowed: Optional[bool] = Field(default=None, description="Whether chunking is allowed")
maxSize: Optional[int] = Field(default=None, description="Maximum size for processing")
textChunkSize: Optional[int] = Field(default=None, description="Size for text chunks")
imageChunkSize: Optional[int] = Field(default=None, description="Size for image chunks")
Generation Models
UnifiedJsonDocument
Standardisiertes JSON-Format für Dokument-Generierung:
class UnifiedJsonDocument(BaseModel):
metadata: Optional[Dict[str, Any]] = Field(default=None, description="Document metadata")
documents: List[DocumentStructure] = Field(default_factory=list, description="List of documents")
statistics: Optional[Dict[str, Any]] = Field(default=None, description="Document statistics")
class DocumentStructure(BaseModel):
sections: List[SectionStructure] = Field(default_factory=list, description="List of sections")
class SectionStructure(BaseModel):
id: str = Field(description="Section identifier")
content_type: str = Field(description="Content type: heading, paragraph, table, list, etc.")
title: Optional[str] = Field(default=None, description="Section title")
order: int = Field(description="Section order")
elements: List[Dict[str, Any]] = Field(default_factory=list, description="Section elements")
AiResponse
Einheitliche Response von AI Calls:
class AiResponse(BaseModel):
content: str = Field(description="Response content (JSON string for planning, text for analysis, unified JSON for documents)")
metadata: Optional[AiResponseMetadata] = Field(None, description="Response metadata")
documents: Optional[List[DocumentData]] = Field(None, description="Generated documents (only for document generation operations)")
def toJson(self) -> Dict[str, Any]:
"""Convert AI response content to JSON using enhanced stabilizing failsafe conversion methods"""
AiCallOptions
Optionen für AI Calls:
class AiCallOptions(BaseModel):
operationType: OperationTypeEnum = Field(default=OperationTypeEnum.DATA_ANALYSE, description="Type of operation")
priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level")
compressPrompt: bool = Field(default=True, description="Whether to compress the prompt")
compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary")
processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately")
maxCost: Optional[float] = Field(default=None, description="Max cost budget")
maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds")
processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Processing mode")
resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.")
safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits")
temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0, description="Temperature for response generation")
maxParts: Optional[int] = Field(default=1000, ge=1, le=1000, description="Maximum number of continuation parts to fetch")
Validator Models
ValidationMetadata
Metadaten für Validierung (in ActionDocument):
# In ActionDocument
validationMetadata: Optional[Dict[str, Any]] = Field(
None,
description="Validation metadata for content validation and refinement"
)
Standard-Struktur:
validationMetadata = {
"actionName": "ai.process",
"actionParameters": {
"resultType": "docx",
"columnsPerRow": 10
},
"extractionMethod": "full",
"sourceDocuments": ["doc1.pdf", "doc2.pdf"],
# ... weitere relevante Parameter für Validierung
}
ValidationResult
Ergebnis der Content-Validierung:
{
"overallSuccess": bool,
"qualityScore": float, # 0.0-1.0
"dataTypeMatch": bool,
"formatMatch": bool,
"documentCount": int,
"criteriaMapping": [
{
"index": int,
"criterion": str,
"met": bool,
"reason": str
}
],
"gapAnalysis": str,
"gapType": str, # "missing_data" | "wrong_structure" | "wrong_format" | "incomplete_data" | "no_gap"
"structureComparison": {
"required": Dict[str, Any],
"found": Dict[str, Any],
"gap": Dict[str, Any]
},
"improvementSuggestions": List[str],
"validationDetails": [
{
"documentName": str,
"issues": List[str],
"suggestions": List[str]
}
]
}
Validierung bei Iterativen Calls
JSON Accumulation mit KPI-Validierung
Problem: Bei großen Dokumenten kann der Validator nicht das gesamte Dokument prüfen.
Lösung: KPI-basierte Validierung während der Generierung.
KPI-Definition:
kpis: List[Dict[str, Any]] = [
{
"id": "table_rows",
"description": "Number of table rows",
"jsonPath": "$.documents[0].sections[?(@.content_type=='table')].elements[0].rows",
"targetValue": 100,
"currentValue": 45
},
{
"id": "list_items",
"description": "Number of list items",
"jsonPath": "$.documents[0].sections[?(@.content_type=='list')].elements[0].items",
"targetValue": 50,
"currentValue": 30
}
]
KPI-Validierung während Accumulation:
# In jeder Iteration:
1. Extract KPI from AI response (AI antwortet mit Prozentzahl)
2. Validate KPI progression:
- If % goes DOWN → Error (e.g., no data received, started new) → Return False
- If % doesn't move (increment < 1%) → Error (no progress) → Return False
- If % goes UP (increment >= 1%) → Good progress → Return True
3. Store KPI in accumulation state
4. Continue accumulation if valid, else stop
KPI Question für AI:
=== PROGRESS INDICATOR ===
Based on the delivered data so far, approximately what percentage (%) of the total
required content has been delivered?
Respond with an integer between 0-100.
⚠️ IMPORTANT:
- If percentage goes DOWN in next iteration → Generation will stop (error detected)
- If percentage doesn't increase by at least 1% → Generation will stop (no progress)
- Only continue if percentage increases by 1% or more
Validierung mit KPIs
Herausforderung: Große Dokumente
Problem: Dokumente können sehr groß sein (mehrere MB), sodass der Validator nicht das gesamte Dokument prüfen kann.
Lösung: Generischer Ansatz für Validierung mit KPIs.
KPI-basierte Validierung
1. KPI-Definition
KPIs werden während der Task-Definition erstellt:
class KpiDefinition(BaseModel):
id: str = Field(description="KPI identifier")
description: str = Field(description="Human-readable description")
jsonPath: str = Field(description="JSONPath expression to extract value")
targetValue: Union[int, float, str] = Field(description="Target value")
currentValue: Optional[Union[int, float, str]] = Field(default=None, description="Current value")
tolerance: Optional[float] = Field(default=0.0, description="Tolerance for numeric values")
2. KPI-Extraktion während Generierung
Während der JSON Accumulation werden KPIs kontinuierlich aktualisiert:
def extractKpisFromJson(
jsonData: Dict[str, Any],
kpiDefinitions: List[KpiDefinition]
) -> List[Dict[str, Any]]:
"""Extract current KPI values from JSON using JSONPath"""
updatedKpis = []
for kpi in kpiDefinitions:
# Extract value using JSONPath
currentValue = jsonPath(jsonData, kpi.jsonPath)
updatedKpis.append({
"id": kpi.id,
"description": kpi.description,
"jsonPath": kpi.jsonPath,
"targetValue": kpi.targetValue,
"currentValue": currentValue,
"progress": calculateProgress(currentValue, kpi.targetValue)
})
return updatedKpis
3. KPI-Validierung
def validateKpiProgression(
accumulationState: JsonAccumulationState,
currentKpi: int
) -> bool:
"""Validate KPI progression from AI response"""
lastKpi = accumulationState.lastKpi if accumulationState.lastKpi else 0
increment = currentKpi - lastKpi
if increment < 0:
return False # Went down - error
if increment < 1:
return False # No progress - error
return True # Progress - good
4. Content Validator mit KPI-Support
Der Content Validator verwendet KPIs für Validierung großer Dokumente:
async def validateContent(
self,
documents: List[Any],
intent: Dict[str, Any],
taskStep: Optional[Any] = None,
actionName: Optional[str] = None,
actionParameters: Optional[Dict[str, Any]] = None,
actionHistory: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""Validates delivered content against user intent using AI"""
# Extract KPI definitions from taskStep or intent
kpiDefinitions = extractKpiDefinitions(taskStep, intent)
# For large documents: Use KPI-based validation
if isLargeDocument(documents):
return await self._validateWithKpis(documents, intent, kpiDefinitions, taskStep)
# For small documents: Use full content validation
return await self._validateWithAI(documents, intent, taskStep, actionName, actionParameters, actionHistory)
Struktur-basierte Validierung
Für große Dokumente wird nur die Struktur validiert, nicht der gesamte Content:
def _summarizeJsonStructure(self, jsonData: Any) -> Dict[str, Any]:
"""Summarize JSON document structure for validation"""
summary = {
"metadata": {},
"sections": [],
"statistics": {}
}
# Extract metadata
metadata = jsonData.get("metadata", {})
if metadata and isinstance(metadata, dict):
summary["metadata"] = dict(metadata)
# Extract sections with statistics
documents = jsonData.get("documents", [])
if documents:
summary["statistics"]["documentCount"] = len(documents)
if len(documents) > 0:
doc = documents[0]
docSections = doc.get("sections", [])
summary["statistics"]["sectionCount"] = len(docSections)
# Summarize sections (without full content)
for section in docSections:
sectionSummary = {
"id": section.get("id"),
"content_type": section.get("content_type"),
"title": section.get("title"),
"order": section.get("order")
}
# For tables: extract caption and statistics
if section.get("content_type") == "table":
elements = section.get("elements", [])
if elements and len(elements) > 0:
tableElement = elements[0]
sectionSummary["caption"] = tableElement.get("caption")
headers = tableElement.get("headers", [])
rows = tableElement.get("rows", [])
sectionSummary["columnCount"] = len(headers)
sectionSummary["rowCount"] = len(rows)
sectionSummary["headers"] = headers
# For lists: extract item count
elif section.get("content_type") == "list":
elements = section.get("elements", [])
if elements and len(elements) > 0:
listElement = elements[0]
items = listElement.get("items", [])
sectionSummary["itemCount"] = len(items)
summary["sections"].append(sectionSummary)
return summary
Vorteile:
- ✅ Validiert Struktur ohne gesamten Content zu laden
- ✅ Verwendet Statistiken (Anzahl Zeilen, Items, etc.) für Validierung
- ✅ Skaliert für sehr große Dokumente (GB-Größe)
Code-Struktur
Gateway-Architektur
gateway/
├── modules/
│ ├── routes/ # API Routes
│ ├── security/ # Security & Middlewares
│ ├── shared/ # Shared Utilities
│ ├── services/ # Business Logic Services
│ │ ├── serviceAi.py # AI Service (callAiPlanning, callAiContent)
│ │ ├── serviceDocumentExtraction.py # Extraction Service
│ │ └── serviceDocumentGeneration.py # Generation Service
│ ├── interfaces/ # Interface Layer
│ │ └── interfaceAiObjects.py # AI Objects Interface
│ ├── connectors/ # External Connectors
│ │ ├── connectorAiOpenai.py
│ │ ├── connectorAiAnthropic.py
│ │ └── ...
│ ├── datamodels/ # Pydantic Models
│ │ ├── datamodelAi.py
│ │ ├── datamodelExtraction.py
│ │ ├── datamodelWorkflow.py
│ │ └── ...
│ └── workflows/ # Workflow Processing
│ ├── processing/
│ │ ├── core/
│ │ │ └── validator.py # Workflow Validator
│ │ └── adaptive/
│ │ └── contentValidator.py # Content Validator
│ └── methods/
│ ├── methodBase.py
│ ├── methodAi/
│ │ ├── methodAi.py
│ │ └── actions/
│ │ ├── process.py
│ │ └── ...
│ └── methodContext/
│ └── actions/
│ ├── extractContent.py
│ └── neutralizeData.py
Service Dependency Hierarchy
aiService (independent)
└─> No dependencies
generationService (independent)
└─> No dependencies
extractionService (independent)
└─> No dependencies
chatService (depends on aiService, generationService)
├─> Uses: aiService (for AI calls)
└─> Uses: generationService (for document rendering)
workflowProcessor (depends on all services)
├─> Uses: chatService
├─> Uses: aiService
├─> Uses: generationService
└─> Uses: extractionService
Wichtige Dateien
AI Service
gateway/modules/services/serviceAi.py: Zentrale AI Service ImplementierungcallAiPlanning(): Für Planning-OperationencallAiContent(): Für Content-Processing (unified)
Extraction Service
gateway/modules/services/serviceDocumentExtraction.py: Extraction ServiceextractContent(): Phase 1 - Pure extractionprocessDocumentsPerChunk(): Phase 2 - AI-based extraction mit chunking
Generation Service
gateway/modules/services/serviceDocumentGeneration.py: Generation ServicerenderReport(): Rendert Unified JSON zu Format
Validators
-
gateway/modules/workflows/processing/core/validator.py: Workflow ValidatorvalidateTask(): Validiert Task-StrukturvalidateAction(): Validiert Action-Struktur
-
gateway/modules/workflows/processing/adaptive/contentValidator.py: Content ValidatorvalidateContent(): Validiert generierte Inhalte gegen User-Intent_validateWithAI(): AI-basierte Validierung_summarizeJsonStructure(): Struktur-Zusammenfassung für große Dokumente
Data Models
-
gateway/modules/datamodels/datamodelAi.py: AI ModelsAiCallOptions,AiResponse,JsonAccumulationState, etc.
-
gateway/modules/datamodels/datamodelExtraction.py: Extraction ModelsContentPart,ContentExtracted,ExtractionOptions, etc.
-
gateway/modules/datamodels/datamodelWorkflow.py: Workflow ModelsChatWorkflow,TaskContext,ActionDefinition, etc.
Weitere wichtige Themen
1. Action Registry Pattern
Jede Action-Funktion wird zusammen mit ihrem Parameter-Pydantic-Model im selben Modul definiert:
# In methodAi/actions/process.py
class AiProcessParameters(BaseModel):
"""Parameters for AI processing action"""
aiPrompt: str = Field(description="AI instruction prompt")
contentParts: Optional[List[ContentPart]] = Field(None, description="Already-extracted content parts")
resultType: str = Field(default="txt", description="Output file extension")
async def process(parameters: AiProcessParameters) -> ActionResult:
"""Execute AI processing action"""
# Implementation...
Vorteile:
- ✅ Type safety: Parameter werden zur Laufzeit validiert
- ✅ Co-location: Action und Parameter zusammen definiert
- ✅ Extensibility: Neue Actions können einfach hinzugefügt werden
2. Validation Metadata Pattern
Alle Actions, die ActionDocument zurückgeben, MÜSSEN validationMetadata enthalten:
# In MethodBase
def _createValidationMetadata(self, actionName: str, **kwargs) -> Dict[str, Any]:
"""Create standardized validationMetadata for ActionDocument instances"""
return {
"actionName": actionName,
**kwargs # Action-specific parameters
}
# In Action
validationMetadata = self._createValidationMetadata(
actionName="ai.process",
resultType="docx",
columnsPerRow=10,
extractionMethod="full"
)
return ActionResult(
success=True,
documents=[
ActionDocument(
documentName="report.docx",
documentData=renderedBytes,
mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
validationMetadata=validationMetadata # REQUIRED
)
]
)
3. Document Reference System
Dokumente werden über typisierte Referenzen referenziert:
class DocumentReference(BaseModel):
"""Base class for document references"""
pass
class DocumentListReference(DocumentReference):
"""Reference to a document list via message label"""
messageId: Optional[str] = None
label: str
def to_string(self) -> str:
if self.messageId:
return f"docList:{self.messageId}:{self.label}"
return f"docList:{self.label}"
class DocumentItemReference(DocumentReference):
"""Reference to a specific document item"""
documentId: str
fileName: Optional[str] = None
def to_string(self) -> str:
if self.fileName:
return f"docItem:{self.documentId}:{self.fileName}"
return f"docItem:{self.documentId}"
Verwendung:
# In Action Definition
documentList = DocumentReferenceList(
references=[
DocumentListReference(messageId="msg_123", label="task1_results"),
DocumentItemReference(documentId="doc_456", fileName="report.pdf")
]
)
# Convert to string list for action parameters
stringRefs = documentList.to_string_list()
# ["docList:msg_123:task1_results", "docItem:doc_456:report.pdf"]
4. Document Persistence Strategy
Phase 1: Within Running Task
- Dokumente werden direkt als
ActionResult+ActionDocumentübergeben - KEINE ChatDocuments erstellt
- Dokumente sind ephemeral aber verfügbar innerhalb des Tasks
Phase 2: Between Tasks/Rounds
- Dokumente werden via
ChatMessage+ChatDocumentspersistiert - Erforderlich für
docList:Referenzen documentsLabelwird gesetzt für spätere Referenzierung
Phase 3: User Delivery
- Vollständiges
ChatMessage(User-Sprache) +ChatDocuments - Für User-facing Workflows
5. Error Handling und Retry Logic
Model Failover
- Wenn ein Model fehlschlägt, wird automatisch das nächste Model aus der Failover-Liste ausgewählt
- Chunk-Größe wird für das neue Model neu berechnet
- Processing wird mit dem neuen Model fortgesetzt
JSON Repair
- Bei kaputten JSON-Strings wird
repairBrokenJson()verwendet - Versucht JSON-Struktur zu reparieren
- Falls Reparatur fehlschlägt, wird mit vorherigem akkumulierten String fortgesetzt
6. Progress Logging
Progress wird auf mehreren Ebenen geloggt:
# In AI Service
self.services.chat.progressLogStart(
workflowId=workflow.id,
roundIndex=workflow.getRoundIndex(),
taskIndex=workflow.getTaskIndex(),
actionIndex=workflow.getActionIndex(),
message="Starting AI call..."
)
# ... processing ...
self.services.chat.progressLogFinish(
workflowId=workflow.id,
roundIndex=workflow.getRoundIndex(),
taskIndex=workflow.getTaskIndex(),
actionIndex=workflow.getActionIndex(),
success=True
)
7. Debugging und Tracing
Debug Files
- AI Prompts werden als Debug-Files gespeichert
- JSON Responses werden als Debug-Files gespeichert
- Debug-Type wird für Kategorisierung verwendet
JSON Extraction Utilities
extractJsonString(): Extrahiert JSON aus Text mit Code-FencestryParseJson(): Safe JSON Parsing mit Error HandlingrepairBrokenJson(): Repariert kaputte/incomplete JSON-Strings
Zusammenfassung
Das PowerOn AI-System verwendet:
- Dynamischen Workflow mit Round → Task → Action Hierarchie
- Standardisierte Datenmodelle für alle Komponenten
- 4-Phasen-Modell: Extraction → Neutralization → Generation → Rendering
- KPI-basierte Validierung für große Dokumente
- Model-aware Chunking für optimale Ressourcennutzung
- JSON Accumulation für iterative Generierung
- Type-safe Parameter Validation durch Pydantic Models
- Struktur-basierte Validierung für große Dokumente
Alle Komponenten sind generisch und erweiterbar, sodass neue Actions, Extractors, Renderers und Validators einfach hinzugefügt werden können.