gateway/modules/datamodels/datamodelWorkflow.py
2026-04-10 12:33:27 +02:00

303 lines
14 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Workflow execution models for action definitions, AI responses, and workflow-level structures.
"""
from typing import Dict, Any, List, Optional, TYPE_CHECKING
from pydantic import BaseModel, Field
from modules.shared.i18nRegistry import i18nModel
from modules.shared.jsonUtils import extractJsonString, tryParseJson, repairBrokenJson
# Import DocumentReferenceList at runtime (needed for ActionDefinition)
from modules.datamodels.datamodelDocref import DocumentReferenceList
@i18nModel("Aktionsdefinition")
class ActionDefinition(BaseModel):
"""Action definition with selection and parameters from planning phase"""
# Core action selection (Stage 1)
action: str = Field(description="Compound action name (method.action)", json_schema_extra={"label": "Aktion"})
actionObjective: str = Field(description="Objective for this action", json_schema_extra={"label": "Aktionsziel"})
userMessage: Optional[str] = Field(
None,
description="User-friendly message in user's language explaining what this action will do (generated by AI in prompts)",
json_schema_extra={"label": "Benutzernachricht"},
)
parametersContext: Optional[str] = Field(
None,
description="Context for parameter generation",
json_schema_extra={"label": "Parameter-Kontext"},
)
learnings: List[str] = Field(
default_factory=list,
description="Learnings from previous actions",
json_schema_extra={"label": "Erkenntnisse"},
)
# Resources (ALWAYS defined in Stage 1 if action needs them)
documentList: Optional[DocumentReferenceList] = Field(
None,
description="Document references (ALWAYS defined in Stage 1 if action needs documents)",
json_schema_extra={"label": "Dokumentenliste"},
)
connectionReference: Optional[str] = Field(
None,
description="Connection reference (ALWAYS defined in Stage 1 if action needs connection)",
json_schema_extra={"label": "Verbindungsreferenz"},
)
# Parameters (may be defined in Stage 1 OR Stage 2, depending on action and actionObjective)
parameters: Optional[Dict[str, Any]] = Field(
None,
description="Action-specific parameters (generated in Stage 2 for complex actions, or inferred from actionObjective for simple actions)",
json_schema_extra={"label": "Parameter"},
)
def hasParameters(self) -> bool:
"""Check if parameters have been generated (Stage 2 complete or inferred)"""
return self.parameters is not None
def needsStage2(self) -> bool:
"""Determine if Stage 2 parameter generation is needed (generic, deterministic check)
Generic logic (works for any action, dynamically added or removed):
- If parameters are already set → Stage 2 not needed
- If parameters are None → Stage 2 needed (to generate parameters from actionObjective and context)
Note: Stage 1 always defines documentList and connectionReference if the action needs them.
Stage 2 only generates the action-specific parameters dictionary.
"""
# Generic check: if parameters are not set, Stage 2 is needed
return self.parameters is None
def updateFromStage1StringReferences(self, stringRefs: Optional[List[str]], connectionRef: Optional[str]):
"""Update documentList and connectionReference from Stage 1 string references
Called when Stage 1 AI returns string references that need to be converted to typed models.
"""
if stringRefs:
self.documentList = DocumentReferenceList.from_string_list(stringRefs)
if connectionRef:
self.connectionReference = connectionRef
@i18nModel("KI-Antwort-Metadaten")
class AiResponseMetadata(BaseModel):
"""Metadata for AI response (varies by operation type)."""
# Document Generation Metadata
title: Optional[str] = Field(None, description="Document title", json_schema_extra={"label": "Titel"})
filename: Optional[str] = Field(None, description="Document filename", json_schema_extra={"label": "Dateiname"})
# Operation-Specific Metadata
operationType: Optional[str] = Field(None, description="Type of operation performed", json_schema_extra={"label": "Vorgangstyp"})
schemaVersion: Optional[str] = Field(
None,
description="Schema version (e.g., 'parameters_v1')",
alias="schema",
json_schema_extra={"label": "Schema-Version"},
)
extractionMethod: Optional[str] = Field(None, description="Method used for extraction", json_schema_extra={"label": "Extraktionsmethode"})
sourceDocuments: Optional[List[str]] = Field(None, description="Source document references", json_schema_extra={"label": "Quelldokumente"})
# Additional metadata (for extensibility)
additionalData: Optional[Dict[str, Any]] = Field(
None,
description="Additional operation-specific metadata",
json_schema_extra={"label": "Zusätzliche Daten"},
)
@i18nModel("Dokumentdaten")
class DocumentData(BaseModel):
"""Single document in response"""
documentName: str = Field(description="Document name", json_schema_extra={"label": "Dokumentname"})
documentData: Any = Field(description="Document data (can be str, bytes, dict, etc.)", json_schema_extra={"label": "Dokumentdaten"})
mimeType: str = Field(description="MIME type of the document", json_schema_extra={"label": "MIME-Typ"})
sourceJson: Optional[Dict[str, Any]] = Field(
None,
description="Source JSON structure (preserved when rendering to xlsx/docx/pdf)",
json_schema_extra={"label": "Quell-JSON"},
)
@i18nModel("Extraktionsparameter")
class ExtractContentParameters(BaseModel):
"""Parameters for extraction action.
This model is defined together with the `methodAi.extractContent()` action function.
All action parameter models follow this pattern: defined in the same module as the action.
However, since this is a workflow-level model used across the system, it's defined here.
"""
documentList: DocumentReferenceList = Field(
description="Document references to extract content from",
json_schema_extra={"label": "Dokumentenliste"},
)
extractionOptions: Optional[Any] = Field( # ExtractionOptions - forward reference
None,
description="Extraction options (determined dynamically based on task and document characteristics)",
json_schema_extra={"label": "Extraktionsoptionen"},
)
@i18nModel("KI-Antwort")
class AiResponse(BaseModel):
"""Unified response from all AI calls (planning, text, documents)"""
content: str = Field(
description="Response content (JSON string for planning, text for analysis, unified JSON for documents)",
json_schema_extra={"label": "Inhalt"},
)
metadata: Optional[AiResponseMetadata] = Field(
None,
description="Response metadata (varies by operation type)",
json_schema_extra={"label": "Metadaten"},
)
documents: Optional[List[DocumentData]] = Field(
None,
description="Generated documents (only for document generation operations)",
json_schema_extra={"label": "Dokumente"},
)
def toJson(self) -> Dict[str, Any]:
"""
Convert AI response content to JSON using enhanced stabilizing failsafe conversion methods.
Centralizes AI result to JSON conversion in one place.
Uses methods from jsonUtils:
- tryParseJson() - Safe parsing with error handling
- repairBrokenJson() - Repairs broken/incomplete JSON
- extractJsonString() - Extracts JSON from text with code fences
Returns:
Dict containing the parsed JSON content, or a safe fallback structure if parsing fails.
- If content is valid JSON dict: returns the dict directly
- If content is valid JSON list: wraps in {"data": [...]}
- If content is broken JSON: attempts repair using repairBrokenJson()
- If all parsing fails: returns {"content": "...", "parseError": True}
"""
# If content is already a dict, return it directly
if isinstance(self.content, dict):
return self.content
# If content is already a list, wrap it
if isinstance(self.content, list):
return {"data": self.content}
# Convert to string if needed
contentStr = str(self.content) if not isinstance(self.content, str) else self.content
# First, try to extract JSON from text (handles code fences, etc.)
extractedJson = extractJsonString(contentStr)
# Try to parse as JSON (returns tuple: obj, error, cleaned_str)
parsedJson, parseError, _ = tryParseJson(extractedJson)
if parsedJson is not None and parseError is None:
# If it's a dict, return directly
if isinstance(parsedJson, dict):
return parsedJson
# If it's a list, wrap in dict
elif isinstance(parsedJson, list):
return {"data": parsedJson}
# Try to repair broken JSON
repairedJson = repairBrokenJson(contentStr)
if repairedJson:
# repairBrokenJson returns Optional[Dict[str, Any]] - always a dict or None
if isinstance(repairedJson, dict):
return repairedJson
# All parsing failed - return safe fallback
contentStr = str(self.content) if not isinstance(self.content, str) else self.content
return {"content": contentStr, "parseError": True}
# Workflow-level models
@i18nModel("Anfragekontext")
class RequestContext(BaseModel):
"""Normalized request context from user input"""
originalPrompt: str = Field(description="Original user prompt", json_schema_extra={"label": "Ursprüngliche Eingabe"})
documents: List[Any] = Field( # ChatDocument - forward reference
default_factory=list,
description="Documents provided by user",
json_schema_extra={"label": "Dokumente"},
)
userLanguage: str = Field(description="User's language", json_schema_extra={"label": "Benutzersprache"})
detectedComplexity: str = Field(
description="Complexity level: simple, moderate, complex",
json_schema_extra={"label": "Erkannte Komplexität"},
)
requiresDocuments: bool = Field(default=False, description="Whether request requires documents", json_schema_extra={"label": "Benötigt Dokumente"})
requiresWebResearch: bool = Field(default=False, description="Whether request requires web research", json_schema_extra={"label": "Benötigt Web-Recherche"})
requiresAnalysis: bool = Field(default=False, description="Whether request requires analysis", json_schema_extra={"label": "Benötigt Analyse"})
expectedOutputFormat: Optional[str] = Field(None, description="Expected output format", json_schema_extra={"label": "Erwartetes Ausgabeformat"})
expectedOutputType: Optional[str] = Field(None, description="Expected output type: answer, document, analysis", json_schema_extra={"label": "Erwarteter Ausgabetyp"})
@i18nModel("Verständnis-Ergebnis")
class UnderstandingResult(BaseModel):
"""Result from initial understanding phase (combined AI call)"""
parameters: Dict[str, Any] = Field(
default_factory=dict,
description="Basic parameters (language, format, detail level)",
json_schema_extra={"label": "Parameter"},
)
intention: Dict[str, Any] = Field(
default_factory=dict,
description="User intention (primaryGoal, secondaryGoals, intentionType)",
json_schema_extra={"label": "Absicht"},
)
context: Dict[str, Any] = Field(
default_factory=dict,
description="Extracted context (topics, requirements, constraints)",
json_schema_extra={"label": "Kontext"},
)
documentReferences: List[Dict[str, Any]] = Field(
default_factory=list,
description="Document references with purpose and relevance",
json_schema_extra={"label": "Dokumentenreferenzen"},
)
tasks: List["TaskDefinition"] = Field( # Forward reference
default_factory=list,
description="Task definitions with deliverables",
json_schema_extra={"label": "Aufgaben"},
)
@i18nModel("Aufgabenbeschreibung")
class TaskDefinition(BaseModel):
"""Task definition from understanding phase"""
id: str = Field(description="Task identifier", json_schema_extra={"label": "Aufgaben-ID"})
objective: str = Field(description="Task objective", json_schema_extra={"label": "Ziel"})
deliverable: Dict[str, Any] = Field(
description="Deliverable specification (type, format, style, detailLevel)",
json_schema_extra={"label": "Lieferobjekt"},
)
requiresWebResearch: bool = Field(default=False, description="Whether task requires web research", json_schema_extra={"label": "Benötigt Web-Recherche"})
requiresDocumentAnalysis: bool = Field(default=False, description="Whether task requires document analysis", json_schema_extra={"label": "Benötigt Dokumentenanalyse"})
requiresContentGeneration: bool = Field(default=True, description="Whether task requires content generation", json_schema_extra={"label": "Benötigt Inhaltserstellung"})
requiredDocuments: List[str] = Field(
default_factory=list,
description="Document references needed for this task",
json_schema_extra={"label": "Benötigte Dokumente"},
)
extractionOptions: Optional[Any] = Field( # ExtractionOptions - forward reference
None,
description="Extraction options for document processing (determined dynamically based on task and document characteristics)",
json_schema_extra={"label": "Extraktionsoptionen"},
)
@i18nModel("Workflow-Aufgabenergebnis")
class WorkflowTaskResult(BaseModel):
"""Result from task execution"""
taskId: str = Field(description="Task identifier", json_schema_extra={"label": "Aufgaben-ID"})
actionResult: Any = Field(description="ActionResult from task execution", json_schema_extra={"label": "Aktionsergebnis"}) # ActionResult - forward reference