gateway/modules/datamodels/datamodelChat.py
2026-04-19 01:22:33 +02:00

499 lines
35 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Chat models: ChatWorkflow, ChatMessage, ChatLog, ChatDocument."""
from typing import List, Dict, Any, Optional
from enum import Enum
from pydantic import BaseModel, Field
from modules.datamodels.datamodelBase import PowerOnModel
from modules.shared.i18nRegistry import i18nModel
from modules.shared.timeUtils import getUtcTimestamp
import uuid
@i18nModel("Chat-Protokoll")
class ChatLog(PowerOnModel):
"""Log entries for chat workflows. User-owned, no mandate context."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key", json_schema_extra={"label": "ID"})
workflowId: str = Field(
description="Foreign key to workflow",
json_schema_extra={"label": "Workflow-ID", "fk_target": {"db": "poweron_chat", "table": "ChatWorkflow"}},
)
message: str = Field(description="Log message", json_schema_extra={"label": "Nachricht"})
type: str = Field(description="Log type (info, warning, error, etc.)", json_schema_extra={"label": "Typ"})
timestamp: float = Field(default_factory=getUtcTimestamp,
description="When the log entry was created (UTC timestamp in seconds)", json_schema_extra={"label": "Zeitstempel"})
status: Optional[str] = Field(None, description="Status of the log entry", json_schema_extra={"label": "Status"})
progress: Optional[float] = Field(None, description="Progress indicator (0.0 to 1.0)", json_schema_extra={"label": "Fortschritt"})
performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics", json_schema_extra={"label": "Leistung"})
parentId: Optional[str] = Field(None, description="Parent operation ID (operationId of parent operation) for hierarchical display", json_schema_extra={"label": "Übergeordnete ID"})
operationId: Optional[str] = Field(None, description="Operation ID to group related log entries", json_schema_extra={"label": "Vorgangs-ID"})
roundNumber: Optional[int] = Field(None, description="Round number in workflow", json_schema_extra={"label": "Rundennummer"})
taskNumber: Optional[int] = Field(None, description="Task number within round", json_schema_extra={"label": "Aufgabennummer"})
actionNumber: Optional[int] = Field(None, description="Action number within task", json_schema_extra={"label": "Aktionsnummer"})
@i18nModel("Chat-Dokument")
class ChatDocument(PowerOnModel):
"""Documents attached to chat messages. User-owned, no mandate context."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key", json_schema_extra={"label": "ID"})
messageId: str = Field(
description="Foreign key to message",
json_schema_extra={"label": "Nachrichten-ID", "fk_target": {"db": "poweron_chat", "table": "ChatMessage"}},
)
fileId: str = Field(
description="Foreign key to file",
json_schema_extra={"label": "Datei-ID", "fk_target": {"db": "poweron_management", "table": "FileItem"}},
)
fileName: str = Field(description="Name of the file", json_schema_extra={"label": "Dateiname"})
fileSize: int = Field(description="Size of the file", json_schema_extra={"label": "Dateigröße"})
mimeType: str = Field(description="MIME type of the file", json_schema_extra={"label": "MIME-Typ"})
roundNumber: Optional[int] = Field(None, description="Round number in workflow", json_schema_extra={"label": "Rundennummer"})
taskNumber: Optional[int] = Field(None, description="Task number within round", json_schema_extra={"label": "Aufgabennummer"})
actionNumber: Optional[int] = Field(None, description="Action number within task", json_schema_extra={"label": "Aktionsnummer"})
actionId: Optional[str] = Field(None, description="ID of the action that created this document", json_schema_extra={"label": "Aktions-ID"})
@i18nModel("Inhalts-Metadaten")
class ContentMetadata(BaseModel):
size: int = Field(description="Content size in bytes", json_schema_extra={"label": "Größe"})
pages: Optional[int] = Field(None, description="Number of pages for multi-page content", json_schema_extra={"label": "Seiten"})
error: Optional[str] = Field(None, description="Processing error if any", json_schema_extra={"label": "Fehler"})
width: Optional[int] = Field(None, description="Width in pixels for images/videos", json_schema_extra={"label": "Breite"})
height: Optional[int] = Field(None, description="Height in pixels for images/videos", json_schema_extra={"label": "Höhe"})
colorMode: Optional[str] = Field(None, description="Color mode", json_schema_extra={"label": "Farbmodus"})
fps: Optional[float] = Field(None, description="Frames per second for videos", json_schema_extra={"label": "FPS"})
durationSec: Optional[float] = Field(None, description="Duration in seconds for media", json_schema_extra={"label": "Dauer"})
mimeType: str = Field(description="MIME type of the content", json_schema_extra={"label": "MIME-Typ"})
base64Encoded: bool = Field(description="Whether the data is base64 encoded", json_schema_extra={"label": "Base64-kodiert"})
@i18nModel("Inhaltselement")
class ContentItem(BaseModel):
label: str = Field(description="Content label", json_schema_extra={"label": "Bezeichnung"})
data: str = Field(description="Extracted text content", json_schema_extra={"label": "Daten"})
metadata: ContentMetadata = Field(description="Content metadata", json_schema_extra={"label": "Metadaten"})
@i18nModel("Extrahierter Inhalt")
class ChatContentExtracted(BaseModel):
id: str = Field(description="Reference to source ChatDocument", json_schema_extra={"label": "Objekt-ID"})
contents: List[ContentItem] = Field(default_factory=list, description="List of content items", json_schema_extra={"label": "Inhalte"})
@i18nModel("Chat-Nachricht")
class ChatMessage(PowerOnModel):
"""Messages in chat workflows. User-owned, no mandate context."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key", json_schema_extra={"label": "ID"})
workflowId: str = Field(
description="Foreign key to workflow",
json_schema_extra={"label": "Workflow-ID", "fk_target": {"db": "poweron_chat", "table": "ChatWorkflow"}},
)
parentMessageId: Optional[str] = Field(
None,
description="Parent message ID for threading",
json_schema_extra={"label": "Übergeordnete Nachrichten-ID", "fk_target": {"db": "poweron_chat", "table": "ChatMessage"}},
)
documents: List[ChatDocument] = Field(default_factory=list, description="Associated documents", json_schema_extra={"label": "Dokumente"})
documentsLabel: Optional[str] = Field(None, description="Label for the set of documents", json_schema_extra={"label": "Dokumenten-Label"})
message: Optional[str] = Field(None, description="Message content", json_schema_extra={"label": "Nachricht"})
summary: Optional[str] = Field(None, description="Short summary of this message for planning/history", json_schema_extra={"label": "Zusammenfassung"})
role: str = Field(description="Role of the message sender", json_schema_extra={"label": "Rolle"})
status: str = Field(description="Status of the message (first, step, last)", json_schema_extra={"label": "Status"})
sequenceNr: Optional[int] = Field(default=0,
description="Sequence number of the message (set automatically)", json_schema_extra={"label": "Sequenznummer"})
publishedAt: Optional[float] = Field(default=None,
description="When the message was published (UTC timestamp in seconds)", json_schema_extra={"label": "Veröffentlicht am"})
success: Optional[bool] = Field(None, description="Whether the message processing was successful", json_schema_extra={"label": "Erfolg"})
actionId: Optional[str] = Field(None, description="ID of the action that produced this message", json_schema_extra={"label": "Aktions-ID"})
actionMethod: Optional[str] = Field(None, description="Method of the action that produced this message", json_schema_extra={"label": "Aktionsmethode"})
actionName: Optional[str] = Field(None, description="Name of the action that produced this message", json_schema_extra={"label": "Aktionsname"})
roundNumber: Optional[int] = Field(None, description="Round number in workflow", json_schema_extra={"label": "Rundennummer"})
taskNumber: Optional[int] = Field(None, description="Task number within round", json_schema_extra={"label": "Aufgabennummer"})
actionNumber: Optional[int] = Field(None, description="Action number within task", json_schema_extra={"label": "Aktionsnummer"})
taskProgress: Optional[str] = Field(None, description="Task progress status: pending, running, success, fail, retry", json_schema_extra={"label": "Aufgabenfortschritt"})
actionProgress: Optional[str] = Field(None, description="Action progress status: pending, running, success, fail", json_schema_extra={"label": "Aktionsfortschritt"})
class WorkflowModeEnum(str, Enum):
WORKFLOW_DYNAMIC = "Dynamic"
WORKFLOW_AUTOMATION = "Automation"
WORKFLOW_CHATBOT = "Chatbot"
@i18nModel("Chat-Workflow")
class ChatWorkflow(PowerOnModel):
"""Chat workflow container. User-owned, no mandate context."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key", json_schema_extra={"label": "ID", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
featureInstanceId: Optional[str] = Field(
None,
description="Feature instance ID for multi-tenancy isolation",
json_schema_extra={
"label": "Feature-Instanz-ID",
"frontend_type": "text",
"frontend_readonly": True,
"frontend_required": False,
"fk_target": {"db": "poweron_app", "table": "FeatureInstance"},
},
)
linkedWorkflowId: Optional[str] = Field(
None,
description=(
"Optional foreign key linking this chat to an entity outside the "
"ChatWorkflow table (e.g. an Automation2Workflow in the GraphicalEditor "
"AI editor chat). NULL for the default workspace chats. Combined with "
"featureInstanceId this gives a 1:1 relation entity ↔ chat per feature."
),
json_schema_extra={
"label": "Verknüpfter Workflow",
"frontend_type": "text",
"frontend_readonly": True,
"frontend_required": False,
},
)
status: str = Field(default="running", description="Current status of the workflow", json_schema_extra={"frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [
{"value": "running", "label": "Running"},
{"value": "completed", "label": "Completed"},
{"value": "stopped", "label": "Stopped"},
{"value": "error", "label": "Error"},
]})
name: Optional[str] = Field(None, description="Name of the workflow", json_schema_extra={"label": "Name", "frontend_type": "text", "frontend_readonly": False, "frontend_required": True})
currentRound: int = Field(default=0, description="Current round number", json_schema_extra={"label": "Aktuelle Runde", "frontend_type": "integer", "frontend_readonly": True, "frontend_required": False})
currentTask: int = Field(default=0, description="Current task number", json_schema_extra={"label": "Aktuelle Aufgabe", "frontend_type": "integer", "frontend_readonly": True, "frontend_required": False})
currentAction: int = Field(default=0, description="Current action number", json_schema_extra={"label": "Aktuelle Aktion", "frontend_type": "integer", "frontend_readonly": True, "frontend_required": False})
totalTasks: int = Field(default=0, description="Total number of tasks in the workflow", json_schema_extra={"label": "Aufgaben gesamt", "frontend_type": "integer", "frontend_readonly": True, "frontend_required": False})
totalActions: int = Field(default=0, description="Total number of actions in the workflow", json_schema_extra={"label": "Aktionen gesamt", "frontend_type": "integer", "frontend_readonly": True, "frontend_required": False})
lastActivity: float = Field(default_factory=getUtcTimestamp, description="Timestamp of last activity (UTC timestamp in seconds)", json_schema_extra={"label": "Letzte Aktivität", "frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False})
startedAt: float = Field(default_factory=getUtcTimestamp, description="When the workflow started (UTC timestamp in seconds)", json_schema_extra={"label": "Gestartet am", "frontend_type": "timestamp", "frontend_readonly": True, "frontend_required": False})
logs: List[ChatLog] = Field(default_factory=list, description="Workflow logs", json_schema_extra={"label": "Protokolle", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
messages: List[ChatMessage] = Field(default_factory=list, description="Messages in the workflow", json_schema_extra={"label": "Nachrichten", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
tasks: list = Field(default_factory=list, description="List of tasks in the workflow", json_schema_extra={"label": "Aufgaben", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
workflowMode: WorkflowModeEnum = Field(default=WorkflowModeEnum.WORKFLOW_DYNAMIC, description="Workflow mode selector", json_schema_extra={"frontend_type": "select", "frontend_readonly": False, "frontend_required": False, "frontend_options": [
{
"value": WorkflowModeEnum.WORKFLOW_DYNAMIC.value,
"label": "Dynamic",
},
{
"value": WorkflowModeEnum.WORKFLOW_AUTOMATION.value,
"label": "Automation",
},
{
"value": WorkflowModeEnum.WORKFLOW_CHATBOT.value,
"label": "Chatbot",
},
]})
maxSteps: int = Field(default=10, description="Maximum number of iterations in dynamic mode", json_schema_extra={"label": "Max. Schritte", "frontend_type": "integer", "frontend_readonly": False, "frontend_required": False})
expectedFormats: Optional[List[str]] = Field(None, description="List of expected file format extensions from user request (e.g., ['xlsx', 'pdf']). Extracted during intent analysis.", json_schema_extra={"label": "Erwartete Formate", "frontend_type": "text", "frontend_readonly": True, "frontend_required": False})
# Helper methods for execution state management
def getRoundIndex(self) -> int:
"""Get current round index"""
return self.currentRound
def getTaskIndex(self) -> int:
"""Get current task index"""
return self.currentTask
def getActionIndex(self) -> int:
"""Get current action index"""
return self.currentAction
def incrementRound(self):
"""Increment round when new user input received"""
self.currentRound += 1
self.currentTask = 0
self.currentAction = 0
def incrementTask(self):
"""Increment task when starting new task in current round"""
self.currentTask += 1
self.currentAction = 0
def incrementAction(self):
"""Increment action when executing new action in current task"""
self.currentAction += 1
@i18nModel("Benutzereingabe")
class UserInputRequest(BaseModel):
prompt: str = Field(description="Prompt for the user", json_schema_extra={"label": "Eingabeaufforderung"})
listFileId: List[str] = Field(default_factory=list, description="List of file IDs", json_schema_extra={"label": "Datei-IDs"})
userLanguage: str = Field(default="en", description="User's preferred language", json_schema_extra={"label": "Benutzersprache"})
workflowId: Optional[str] = Field(
None,
description="Optional ID of the workflow to continue",
json_schema_extra={"label": "Workflow-ID", "fk_target": {"db": "poweron_chat", "table": "ChatWorkflow"}},
)
allowedProviders: Optional[List[str]] = Field(None, description="List of allowed AI providers (multiselect)", json_schema_extra={"label": "Erlaubte Anbieter"})
@i18nModel("Aktions-Dokument")
class ActionDocument(BaseModel):
"""Clear document structure for action results"""
documentName: str = Field(description="Name of the document", json_schema_extra={"label": "Dokumentname"})
documentData: Any = Field(description="Content/data of the document", json_schema_extra={"label": "Dokumentdaten"})
mimeType: str = Field(description="MIME type of the document", json_schema_extra={"label": "MIME-Typ"})
sourceJson: Optional[Dict[str, Any]] = Field(None,
description="Source JSON structure (preserved when rendering to xlsx/docx/pdf)", json_schema_extra={"label": "Quell-JSON"})
validationMetadata: Optional[Dict[str, Any]] = Field(None,
description="Action-specific metadata for content validation (e.g., email recipients, attachments, SharePoint paths)", json_schema_extra={"label": "Validierungs-Metadaten"})
@i18nModel("Aktionsergebnis")
class ActionResult(BaseModel):
"""Clean action result with documents as primary output
IMPORTANT: Action methods should NOT set resultLabel in their return value.
The resultLabel is managed by the action handler using the action's execResultLabel
from the action plan. This ensures consistent document routing throughout the workflow.
"""
success: bool = Field(description="Whether execution succeeded", json_schema_extra={"label": "Erfolg"})
error: Optional[str] = Field(None, description="Error message if failed", json_schema_extra={"label": "Fehler"})
documents: List[ActionDocument] = Field(default_factory=list, description="Document outputs", json_schema_extra={"label": "Dokumente"})
data: Optional[Dict[str, Any]] = Field(None, description="Structured result data accessible via DataRef", json_schema_extra={"label": "Daten"})
resultLabel: Optional[str] = Field(None,
description="Label for document routing (set by action handler, not by action methods)", json_schema_extra={"label": "Ergebnis-Label"})
@classmethod
def isSuccess(cls, documents: List[ActionDocument] = None, data: Dict[str, Any] = None) -> "ActionResult":
return cls(success=True, documents=documents or [], data=data)
@classmethod
def isFailure(
cls, error: str, documents: List[ActionDocument] = None
) -> "ActionResult":
return cls(success=False, documents=documents or [], error=error)
@i18nModel("Aktionsauswahl")
class ActionSelection(BaseModel):
method: str = Field(description="Method to execute (e.g., web, document, ai)", json_schema_extra={"label": "Methode"})
name: str = Field(description="Action name within the method (e.g., search, extract)", json_schema_extra={"label": "Aktionsname"})
@i18nModel("Aktionsparameter")
class ActionParameters(BaseModel):
parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters to execute the selected action", json_schema_extra={"label": "Parameter"})
@i18nModel("Beobachtungs-Vorschau")
@i18nModel("Beobachtung")
@i18nModel("Beobachtungs-Vorschau")
@i18nModel("Beobachtung")
class ObservationPreview(BaseModel):
name: str = Field(description="Document name or URL label", json_schema_extra={"label": "Name"})
mime: Optional[str] = Field(default=None, description="MIME type or kind (legacy field)", json_schema_extra={"label": "MIME"})
snippet: Optional[str] = Field(default=None, description="Short snippet or summary", json_schema_extra={"label": "Ausschnitt"})
# Extended metadata fields
mimeType: Optional[str] = Field(default=None, description="MIME type", json_schema_extra={"label": "MIME-Typ"})
size: Optional[str] = Field(default=None, description="File size", json_schema_extra={"label": "Größe"})
created: Optional[str] = Field(default=None, description="Creation timestamp", json_schema_extra={"label": "Erstellt"})
modified: Optional[str] = Field(default=None, description="Modification timestamp", json_schema_extra={"label": "Geändert"})
typeGroup: Optional[str] = Field(default=None, description="Document type group", json_schema_extra={"label": "Typgruppe"})
documentId: Optional[str] = Field(default=None, description="Document ID", json_schema_extra={"label": "Dokument-ID"})
reference: Optional[str] = Field(default=None, description="Document reference", json_schema_extra={"label": "Referenz"})
contentSize: Optional[str] = Field(default=None, description="Content size indicator", json_schema_extra={"label": "Inhaltsgröße"})
class Observation(BaseModel):
success: bool = Field(description="Action execution success flag")
resultLabel: str = Field(description="Deterministic label for produced documents")
documentsCount: int = Field(description="Number of produced documents")
previews: List[ObservationPreview] = Field(
default_factory=list, description="Compact previews of outputs"
)
notes: List[str] = Field(
default_factory=list, description="Short notes or key facts"
)
# Extended fields for enhanced validation
contentValidation: Optional[Dict[str, Any]] = Field(
default=None, description="Content validation results"
)
contentAnalysis: Optional[Dict[str, Any]] = Field(
default=None, description="Content analysis results"
)
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@i18nModel("Dokumentaustausch")
class DocumentExchange(BaseModel):
documentsLabel: str = Field(description="Label for the set of documents", json_schema_extra={"label": "Dokumenten-Label"})
documents: List[str] = Field(default_factory=list, description="List of document references", json_schema_extra={"label": "Dokumente"})
@i18nModel("Aufgaben-Aktion")
class ActionItem(BaseModel):
id: str = Field(..., description="Action ID", json_schema_extra={"label": "Aktions-ID"})
execMethod: str = Field(..., description="Method to execute", json_schema_extra={"label": "Methode"})
execAction: str = Field(..., description="Action to perform", json_schema_extra={"label": "Aktion"})
execParameters: Dict[str, Any] = Field(default_factory=dict, description="Action parameters", json_schema_extra={"label": "Parameter"})
execResultLabel: Optional[str] = Field(None, description="Label for the set of result documents", json_schema_extra={"label": "Ergebnis-Label"})
expectedDocumentFormats: Optional[List[Dict[str, str]]] = Field(None, description="Expected document formats (optional)", json_schema_extra={"label": "Erwartete Dokumentformate"})
userMessage: Optional[str] = Field(None, description="User-friendly message in user's language", json_schema_extra={"label": "Benutzernachricht"})
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Action status", json_schema_extra={"label": "Status"})
error: Optional[str] = Field(None, description="Error message if action failed", json_schema_extra={"label": "Fehler"})
retryCount: int = Field(default=0, description="Number of retries attempted", json_schema_extra={"label": "Wiederholungen"})
retryMax: int = Field(default=3, description="Maximum number of retries", json_schema_extra={"label": "Max. Wiederholungen"})
processingTime: Optional[float] = Field(None, description="Processing time in seconds", json_schema_extra={"label": "Bearbeitungszeit"})
timestamp: float = Field(..., description="When the action was executed (UTC timestamp in seconds)", json_schema_extra={"label": "Zeitstempel"})
result: Optional[str] = Field(None, description="Result of the action", json_schema_extra={"label": "Ergebnis"})
def setSuccess(self, result: str = None) -> None:
"""Set the action as successful with optional result"""
self.status = TaskStatus.COMPLETED
self.error = None
if result is not None:
self.result = result
def setError(self, error_message: str) -> None:
"""Set the action as failed with error message"""
self.status = TaskStatus.FAILED
self.error = error_message
@i18nModel("Chat-Aufgabenergebnis")
class ChatTaskResult(BaseModel):
taskId: str = Field(..., description="Task ID", json_schema_extra={"label": "Aufgaben-ID"})
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status", json_schema_extra={"label": "Status"})
success: bool = Field(..., description="Whether the task was successful", json_schema_extra={"label": "Erfolg"})
feedback: Optional[str] = Field(None, description="Task feedback message", json_schema_extra={"label": "Rückmeldung"})
error: Optional[str] = Field(None, description="Error message if task failed", json_schema_extra={"label": "Fehler"})
@i18nModel("Aufgabe")
class TaskItem(BaseModel):
id: str = Field(..., description="Task ID", json_schema_extra={"label": "Aufgaben-ID"})
workflowId: str = Field(
...,
description="Workflow ID",
json_schema_extra={"label": "Workflow-ID", "fk_target": {"db": "poweron_chat", "table": "ChatWorkflow"}},
)
userInput: str = Field(..., description="User input that triggered the task", json_schema_extra={"label": "Benutzereingabe"})
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status", json_schema_extra={"label": "Status"})
error: Optional[str] = Field(None, description="Error message if task failed", json_schema_extra={"label": "Fehler"})
startedAt: Optional[float] = Field(None, description="When the task started (UTC timestamp in seconds)", json_schema_extra={"label": "Gestartet am"})
finishedAt: Optional[float] = Field(None, description="When the task finished (UTC timestamp in seconds)", json_schema_extra={"label": "Beendet am"})
actionList: List[ActionItem] = Field(default_factory=list, description="List of actions to execute", json_schema_extra={"label": "Aktionen"})
retryCount: int = Field(default=0, description="Number of retries attempted", json_schema_extra={"label": "Wiederholungen"})
retryMax: int = Field(default=3, description="Maximum number of retries", json_schema_extra={"label": "Max. Wiederholungen"})
rollbackOnFailure: bool = Field(default=True, description="Whether to rollback on failure", json_schema_extra={"label": "Bei Fehler zurücksetzen"})
dependencies: List[str] = Field(default_factory=list, description="List of task IDs this task depends on", json_schema_extra={"label": "Abhängigkeiten"})
feedback: Optional[str] = Field(None, description="Task feedback message", json_schema_extra={"label": "Rückmeldung"})
processingTime: Optional[float] = Field(None, description="Total processing time in seconds", json_schema_extra={"label": "Bearbeitungszeit"})
resultLabels: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Map of result labels to their values", json_schema_extra={"label": "Ergebnis-Labels"})
@i18nModel("Aufgabenschritt")
class TaskStep(BaseModel):
id: str = Field(description="Task identifier", json_schema_extra={"label": "ID"})
objective: str = Field(description="Task objective", json_schema_extra={"label": "Ziel"})
dependencies: Optional[list[str]] = Field(default_factory=list, json_schema_extra={"label": "ID"})
successCriteria: Optional[list[str]] = Field(default_factory=list, json_schema_extra={"label": "Erfolgskriterien"})
estimatedComplexity: Optional[str] = None
userMessage: Optional[str] = Field(None, description="User-friendly message in user's language", json_schema_extra={"label": "Benutzernachricht"})
# Format details extracted from intent analysis
dataType: Optional[str] = Field(None, description="Expected data type (text, numbers, documents, etc.)", json_schema_extra={"label": "Datentyp"})
expectedFormats: Optional[List[str]] = Field(None, description="Expected output file format extensions (e.g., ['docx', 'pdf', 'xlsx']). Use actual file extensions, not conceptual terms.", json_schema_extra={"label": "Erwartete Formate"})
qualityRequirements: Optional[Dict[str, Any]] = Field(None, description="Quality requirements and constraints", json_schema_extra={"label": "Qualitätsanforderungen"})
@i18nModel("Aufgabenübergabe")
@i18nModel("Aufgabenübergabe")
class TaskHandover(BaseModel):
taskId: str = Field(description="Target task ID", json_schema_extra={"label": "Aufgaben-ID"})
sourceTask: Optional[str] = Field(None, description="Source task ID", json_schema_extra={"label": "Quell-Aufgabe"})
inputDocuments: List[DocumentExchange] = Field(default_factory=list, description="Available input documents", json_schema_extra={"label": "Eingabedokumente"})
outputDocuments: List[DocumentExchange] = Field(default_factory=list, description="Produced output documents", json_schema_extra={"label": "Ausgabedokumente"})
context: Dict[str, Any] = Field(default_factory=dict, description="Task context", json_schema_extra={"label": "Kontext"})
previousResults: List[str] = Field(default_factory=list, description="Previous result summaries", json_schema_extra={"label": "Vorherige Ergebnisse"})
improvements: List[str] = Field(default_factory=list, description="Improvement suggestions", json_schema_extra={"label": "Verbesserungen"})
workflowSummary: Optional[str] = Field(None, description="Summarized workflow context", json_schema_extra={"label": "Workflow-Zusammenfassung"})
messageHistory: List[str] = Field(default_factory=list, description="Key message summaries", json_schema_extra={"label": "Nachrichtenverlauf"})
timestamp: float = Field(..., description="When the handover was created (UTC timestamp in seconds)", json_schema_extra={"label": "Zeitstempel"})
handoverType: str = Field(default="task", description="Type of handover: task, phase, or workflow", json_schema_extra={"label": "Übergabetyp"})
class TaskContext(BaseModel):
taskStep: TaskStep
workflow: Optional[ChatWorkflow] = None
workflowId: Optional[str] = None
availableDocuments: Optional[str] = "No documents available"
availableConnections: Optional[list[str]] = Field(default_factory=list)
previousResults: Optional[list[str]] = Field(default_factory=list)
previousHandover: Optional[TaskHandover] = None
improvements: Optional[list[str]] = Field(default_factory=list)
retryCount: Optional[int] = 0
previousActionResults: Optional[list] = Field(default_factory=list)
previousReviewResult: Optional[dict] = None
isRegeneration: Optional[bool] = False
failurePatterns: Optional[list[str]] = Field(default_factory=list)
failedActions: Optional[list] = Field(default_factory=list)
successfulActions: Optional[list] = Field(default_factory=list)
executedActions: Optional[list] = Field(default_factory=list, description="List of executed actions with action name, parameters, and step number")
criteriaProgress: Optional[dict] = None
# Stage 2 context fields (NEW)
actionObjective: Optional[str] = Field(None, description="Objective for current action")
parametersContext: Optional[str] = Field(None, description="Context for parameter generation")
learnings: Optional[list[str]] = Field(default_factory=list, description="Learnings from previous actions")
stage1Selection: Optional[dict] = Field(None, description="Stage 1 selection data")
nextActionGuidance: Optional[Dict[str, Any]] = Field(None, description="Guidance for the next action from previous refinement")
def updateFromSelection(self, selection: Any):
"""Update context from Stage 1 selection
Args:
selection: ActionDefinition instance from Stage 1
"""
from modules.datamodels.datamodelWorkflow import ActionDefinition
if isinstance(selection, ActionDefinition):
self.actionObjective = selection.actionObjective
self.parametersContext = selection.parametersContext
self.learnings = selection.learnings if selection.learnings else []
self.stage1Selection = selection.model_dump()
def getDocumentReferences(self) -> List[str]:
docs = []
if self.previousHandover:
for doc_exchange in self.previousHandover.inputDocuments:
docs.extend(doc_exchange.documents)
return list(set(docs))
def addImprovement(self, improvement: str) -> None:
if improvement not in (self.improvements or []):
if self.improvements is None:
self.improvements = []
self.improvements.append(improvement)
class ReviewContext(BaseModel):
taskStep: TaskStep
taskActions: Optional[list] = Field(default_factory=list)
actionResults: Optional[list] = Field(default_factory=list)
stepResult: Optional[dict] = Field(default_factory=dict)
workflowId: Optional[str] = None
previousResults: Optional[list[str]] = Field(default_factory=list)
@i18nModel("Prüfergebnis")
@i18nModel("Prüfergebnis")
class ReviewResult(BaseModel):
status: str
reason: Optional[str] = None
improvements: Optional[list[str]] = Field(default_factory=list, json_schema_extra={"label": "Verbesserungen"})
qualityScore: Optional[float] = Field(default=5.0, description="Quality score (0-10)", json_schema_extra={"label": "Qualitätsscore"})
missingOutputs: Optional[list[str]] = Field(default_factory=list, json_schema_extra={"label": "Fehlende Ausgaben"})
metCriteria: Optional[list[str]] = Field(default_factory=list, json_schema_extra={"label": "Erfüllte Kriterien"})
unmetCriteria: Optional[list[str]] = Field(default_factory=list, json_schema_extra={"label": "Nicht erfüllte Kriterien"})
confidence: Optional[float] = 0.5
userMessage: Optional[str] = Field(None, description="User-friendly message in user's language", json_schema_extra={"label": "Benutzernachricht"})
# NEW: Concrete next action guidance (when status is "continue")
nextAction: Optional[str] = Field(None, description="Specific action to execute next (e.g., 'ai.convert', 'ai.process', 'ai.reformat')", json_schema_extra={"label": "Nächste Aktion"})
nextActionParameters: Optional[Dict[str, Any]] = Field(None, description="Parameters for the next action (e.g., {'fromFormat': 'json', 'toFormat': 'csv'})", json_schema_extra={"label": "Parameter nächste Aktion"})
nextActionObjective: Optional[str] = Field(None, description="What this specific action will achieve", json_schema_extra={"label": "Ziel nächste Aktion"})
@i18nModel("Aufgabenplan")
class TaskPlan(BaseModel):
overview: str = Field(json_schema_extra={"label": "Überblick"})
tasks: list[TaskStep] = Field(json_schema_extra={"label": "Aufgaben"})
userMessage: Optional[str] = Field(None, description="Overall user-friendly message for the task plan", json_schema_extra={"label": "Benutzernachricht"})
# Forward references resolved automatically since ChatWorkflow is defined above
@i18nModel("Prompt-Platzhalter")
class PromptPlaceholder(BaseModel):
label: str = Field(json_schema_extra={"label": "Bezeichnung"})
content: str = Field(json_schema_extra={"label": "Inhalt"})
summaryAllowed: bool = Field(default=False,
description="Whether host may summarize content before sending to AI", json_schema_extra={"label": "Zusammenfassung erlaubt"})
@i18nModel("Prompt-Paket")
class PromptBundle(BaseModel):
prompt: str = Field(json_schema_extra={"label": "Prompt"})
placeholders: List[PromptPlaceholder] = Field(default_factory=list, json_schema_extra={"label": "Prompt"})