gateway/modules/datamodels/datamodelWorkflow.py
2025-10-03 13:03:22 +02:00

474 lines
18 KiB
Python

"""Workflow-related base datamodels and step/task structures."""
from enum import Enum
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field
from modules.shared.attributeUtils import register_model_labels, ModelMixin
class ActionDocument(BaseModel, ModelMixin):
"""Clear document structure for action results"""
documentName: str = Field(description="Name of the document")
documentData: Any = Field(description="Content/data of the document")
mimeType: str = Field(description="MIME type of the document")
register_model_labels(
"ActionDocument",
{"en": "Action Document", "fr": "Document d'action"},
{
"documentName": {"en": "Document Name", "fr": "Nom du document"},
"documentData": {"en": "Document Data", "fr": "Données du document"},
"mimeType": {"en": "MIME Type", "fr": "Type MIME"},
},
)
class ActionResult(BaseModel, ModelMixin):
"""Clean action result with documents as primary output
IMPORTANT: Action methods should NOT set resultLabel in their return value.
The resultLabel is managed by the action handler using the action's execResultLabel
from the action plan. This ensures consistent document routing throughout the workflow.
"""
success: bool = Field(description="Whether execution succeeded")
error: Optional[str] = Field(None, description="Error message if failed")
documents: List[ActionDocument] = Field(
default_factory=list, description="Document outputs"
)
resultLabel: Optional[str] = Field(
None,
description="Label for document routing (set by action handler, not by action methods)",
)
@classmethod
def isSuccess(cls, documents: List[ActionDocument] = None) -> "ActionResult":
return cls(success=True, documents=documents or [])
@classmethod
def isFailure(
cls, error: str, documents: List[ActionDocument] = None
) -> "ActionResult":
return cls(success=False, documents=documents or [], error=error)
register_model_labels(
"ActionResult",
{"en": "Action Result", "fr": "Résultat de l'action"},
{
"success": {"en": "Success", "fr": "Succès"},
"error": {"en": "Error", "fr": "Erreur"},
"documents": {"en": "Documents", "fr": "Documents"},
"resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"},
},
)
class ActionSelection(BaseModel, ModelMixin):
method: str = Field(description="Method to execute (e.g., web, document, ai)")
name: str = Field(
description="Action name within the method (e.g., search, extract)"
)
register_model_labels(
"ActionSelection",
{"en": "Action Selection", "fr": "Sélection d'action"},
{
"method": {"en": "Method", "fr": "Méthode"},
"name": {"en": "Action Name", "fr": "Nom de l'action"},
},
)
class ActionParameters(BaseModel, ModelMixin):
parameters: Dict[str, Any] = Field(
default_factory=dict, description="Parameters to execute the selected action"
)
register_model_labels(
"ActionParameters",
{"en": "Action Parameters", "fr": "Paramètres d'action"},
{
"parameters": {"en": "Parameters", "fr": "Paramètres"},
},
)
class ObservationPreview(BaseModel, ModelMixin):
name: str = Field(description="Document name or URL label")
mime: str = Field(description="MIME type or kind")
snippet: str = Field(description="Short snippet or summary")
register_model_labels(
"ObservationPreview",
{"en": "Observation Preview", "fr": "Aperçu d'observation"},
{
"name": {"en": "Name", "fr": "Nom"},
"mime": {"en": "MIME", "fr": "MIME"},
"snippet": {"en": "Snippet", "fr": "Extrait"},
},
)
class Observation(BaseModel, ModelMixin):
success: bool = Field(description="Action execution success flag")
resultLabel: str = Field(description="Deterministic label for produced documents")
documentsCount: int = Field(description="Number of produced documents")
previews: List[ObservationPreview] = Field(
default_factory=list, description="Compact previews of outputs"
)
notes: List[str] = Field(
default_factory=list, description="Short notes or key facts"
)
register_model_labels(
"Observation",
{"en": "Observation", "fr": "Observation"},
{
"success": {"en": "Success", "fr": "Succès"},
"resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"},
"documentsCount": {"en": "Documents Count", "fr": "Nombre de documents"},
"previews": {"en": "Previews", "fr": "Aperçus"},
"notes": {"en": "Notes", "fr": "Notes"},
},
)
class TaskStatus(str, Enum):
"""Task status enumeration."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
register_model_labels(
"TaskStatus",
{"en": "Task Status", "fr": "Statut de la tâche"},
{
"PENDING": {"en": "Pending", "fr": "En attente"},
"RUNNING": {"en": "Running", "fr": "En cours"},
"COMPLETED": {"en": "Completed", "fr": "Terminé"},
"FAILED": {"en": "Failed", "fr": "Échec"},
"CANCELLED": {"en": "Cancelled", "fr": "Annulé"},
},
)
class DocumentExchange(BaseModel, ModelMixin):
documentsLabel: str = Field(description="Label for the set of documents")
documents: List[str] = Field(
default_factory=list, description="List of document references"
)
register_model_labels(
"DocumentExchange",
{"en": "Document Exchange", "fr": "Échange de documents"},
{
"documentsLabel": {"en": "Documents Label", "fr": "Label des documents"},
"documents": {"en": "Documents", "fr": "Documents"},
},
)
class TaskAction(BaseModel, ModelMixin):
id: str = Field(..., description="Action ID")
execMethod: str = Field(..., description="Method to execute")
execAction: str = Field(..., description="Action to perform")
execParameters: Dict[str, Any] = Field(
default_factory=dict, description="Action parameters"
)
execResultLabel: Optional[str] = Field(
None, description="Label for the set of result documents"
)
expectedDocumentFormats: Optional[List[Dict[str, str]]] = Field(
None, description="Expected document formats (optional)"
)
userMessage: Optional[str] = Field(
None, description="User-friendly message in user's language"
)
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Action status")
error: Optional[str] = Field(None, description="Error message if action failed")
retryCount: int = Field(default=0, description="Number of retries attempted")
retryMax: int = Field(default=3, description="Maximum number of retries")
processingTime: Optional[float] = Field(
None, description="Processing time in seconds"
)
timestamp: float = Field(
..., description="When the action was executed (UTC timestamp in seconds)"
)
result: Optional[str] = Field(None, description="Result of the action")
register_model_labels(
"TaskAction",
{"en": "Task Action", "fr": "Action de tâche"},
{
"id": {"en": "Action ID", "fr": "ID de l'action"},
"execMethod": {"en": "Method", "fr": "Méthode"},
"execAction": {"en": "Action", "fr": "Action"},
"execParameters": {"en": "Parameters", "fr": "Paramètres"},
"execResultLabel": {"en": "Result Label", "fr": "Label du résultat"},
"expectedDocumentFormats": {
"en": "Expected Document Formats",
"fr": "Formats de documents attendus",
},
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
"status": {"en": "Status", "fr": "Statut"},
"error": {"en": "Error", "fr": "Erreur"},
"retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
"retryMax": {"en": "Max Retries", "fr": "Tentatives max"},
"processingTime": {"en": "Processing Time", "fr": "Temps de traitement"},
"timestamp": {"en": "Timestamp", "fr": "Horodatage"},
"result": {"en": "Result", "fr": "Résultat"},
},
)
class TaskResult(BaseModel, ModelMixin):
taskId: str = Field(..., description="Task ID")
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status")
success: bool = Field(..., description="Whether the task was successful")
feedback: Optional[str] = Field(None, description="Task feedback message")
error: Optional[str] = Field(None, description="Error message if task failed")
register_model_labels(
"TaskResult",
{"en": "Task Result", "fr": "Résultat de tâche"},
{
"taskId": {"en": "Task ID", "fr": "ID de la tâche"},
"status": {"en": "Status", "fr": "Statut"},
"success": {"en": "Success", "fr": "Succès"},
"feedback": {"en": "Feedback", "fr": "Retour"},
"error": {"en": "Error", "fr": "Erreur"},
},
)
class TaskItem(BaseModel, ModelMixin):
id: str = Field(..., description="Task ID")
workflowId: str = Field(..., description="Workflow ID")
userInput: str = Field(..., description="User input that triggered the task")
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status")
error: Optional[str] = Field(None, description="Error message if task failed")
startedAt: Optional[float] = Field(
None, description="When the task started (UTC timestamp in seconds)"
)
finishedAt: Optional[float] = Field(
None, description="When the task finished (UTC timestamp in seconds)"
)
actionList: List[TaskAction] = Field(
default_factory=list, description="List of actions to execute"
)
retryCount: int = Field(default=0, description="Number of retries attempted")
retryMax: int = Field(default=3, description="Maximum number of retries")
rollbackOnFailure: bool = Field(
default=True, description="Whether to rollback on failure"
)
dependencies: List[str] = Field(
default_factory=list, description="List of task IDs this task depends on"
)
feedback: Optional[str] = Field(None, description="Task feedback message")
processingTime: Optional[float] = Field(
None, description="Total processing time in seconds"
)
resultLabels: Optional[Dict[str, Any]] = Field(
default_factory=dict, description="Map of result labels to their values"
)
register_model_labels(
"TaskItem",
{"en": "Task", "fr": "Tâche"},
{
"id": {"en": "Task ID", "fr": "ID de la tâche"},
"workflowId": {"en": "Workflow ID", "fr": "ID du workflow"},
"userInput": {"en": "User Input", "fr": "Entrée utilisateur"},
"status": {"en": "Status", "fr": "Statut"},
"error": {"en": "Error", "fr": "Erreur"},
"startedAt": {"en": "Started At", "fr": "Démarré à"},
"finishedAt": {"en": "Finished At", "fr": "Terminé à"},
"actionList": {"en": "Actions", "fr": "Actions"},
"retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
"retryMax": {"en": "Max Retries", "fr": "Tentatives max"},
"processingTime": {"en": "Processing Time", "fr": "Temps de traitement"},
},
)
class TaskStep(BaseModel, ModelMixin):
id: str
objective: str
dependencies: Optional[list[str]] = Field(default_factory=list)
success_criteria: Optional[list[str]] = Field(default_factory=list)
estimated_complexity: Optional[str] = None
userMessage: Optional[str] = Field(
None, description="User-friendly message in user's language"
)
register_model_labels(
"TaskStep",
{"en": "Task Step", "fr": "Étape de tâche"},
{
"id": {"en": "ID", "fr": "ID"},
"objective": {"en": "Objective", "fr": "Objectif"},
"dependencies": {"en": "Dependencies", "fr": "Dépendances"},
"success_criteria": {"en": "Success Criteria", "fr": "Critères de succès"},
"estimated_complexity": {
"en": "Estimated Complexity",
"fr": "Complexité estimée",
},
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
},
)
class TaskHandover(BaseModel, ModelMixin):
taskId: str = Field(description="Target task ID")
sourceTask: Optional[str] = Field(None, description="Source task ID")
inputDocuments: List[DocumentExchange] = Field(
default_factory=list, description="Available input documents"
)
outputDocuments: List[DocumentExchange] = Field(
default_factory=list, description="Produced output documents"
)
context: Dict[str, Any] = Field(default_factory=dict, description="Task context")
previousResults: List[str] = Field(
default_factory=list, description="Previous result summaries"
)
improvements: List[str] = Field(
default_factory=list, description="Improvement suggestions"
)
workflowSummary: Optional[str] = Field(
None, description="Summarized workflow context"
)
messageHistory: List[str] = Field(
default_factory=list, description="Key message summaries"
)
timestamp: float = Field(
..., description="When the handover was created (UTC timestamp in seconds)"
)
handoverType: str = Field(
default="task", description="Type of handover: task, phase, or workflow"
)
register_model_labels(
"TaskHandover",
{"en": "Task Handover", "fr": "Transfert de tâche"},
{
"taskId": {"en": "Task ID", "fr": "ID de la tâche"},
"sourceTask": {"en": "Source Task", "fr": "Tâche source"},
"inputDocuments": {"en": "Input Documents", "fr": "Documents d'entrée"},
"outputDocuments": {"en": "Output Documents", "fr": "Documents de sortie"},
"context": {"en": "Context", "fr": "Contexte"},
"previousResults": {"en": "Previous Results", "fr": "Résultats précédents"},
"improvements": {"en": "Improvements", "fr": "Améliorations"},
"workflowSummary": {"en": "Workflow Summary", "fr": "Résumé du workflow"},
"messageHistory": {"en": "Message History", "fr": "Historique des messages"},
"timestamp": {"en": "Timestamp", "fr": "Horodatage"},
"handoverType": {"en": "Handover Type", "fr": "Type de transfert"},
},
)
class TaskContext(BaseModel, ModelMixin):
task_step: TaskStep
workflow: Optional["ChatWorkflow"] = None
workflow_id: Optional[str] = None
available_documents: Optional[str] = "No documents available"
available_connections: Optional[list[str]] = Field(default_factory=list)
previous_results: Optional[list[str]] = Field(default_factory=list)
previous_handover: Optional[TaskHandover] = None
improvements: Optional[list[str]] = Field(default_factory=list)
retry_count: Optional[int] = 0
previous_action_results: Optional[list] = Field(default_factory=list)
previous_review_result: Optional[dict] = None
is_regeneration: Optional[bool] = False
failure_patterns: Optional[list[str]] = Field(default_factory=list)
failed_actions: Optional[list] = Field(default_factory=list)
successful_actions: Optional[list] = Field(default_factory=list)
criteria_progress: Optional[dict] = None
def getDocumentReferences(self) -> List[str]:
docs = []
if self.previous_handover:
for doc_exchange in self.previous_handover.inputDocuments:
docs.extend(doc_exchange.documents)
return list(set(docs))
def addImprovement(self, improvement: str) -> None:
if improvement not in (self.improvements or []):
if self.improvements is None:
self.improvements = []
self.improvements.append(improvement)
class ReviewContext(BaseModel, ModelMixin):
task_step: TaskStep
task_actions: Optional[list] = Field(default_factory=list)
action_results: Optional[list] = Field(default_factory=list)
step_result: Optional[dict] = Field(default_factory=dict)
workflow_id: Optional[str] = None
previous_results: Optional[list[str]] = Field(default_factory=list)
class ReviewResult(BaseModel, ModelMixin):
status: str
reason: Optional[str] = None
improvements: Optional[list[str]] = Field(default_factory=list)
quality_score: Optional[int] = 5
missing_outputs: Optional[list[str]] = Field(default_factory=list)
met_criteria: Optional[list[str]] = Field(default_factory=list)
unmet_criteria: Optional[list[str]] = Field(default_factory=list)
confidence: Optional[float] = 0.5
userMessage: Optional[str] = Field(
None, description="User-friendly message in user's language"
)
register_model_labels(
"ReviewResult",
{"en": "Review Result", "fr": "Résultat de l'évaluation"},
{
"status": {"en": "Status", "fr": "Statut"},
"reason": {"en": "Reason", "fr": "Raison"},
"improvements": {"en": "Improvements", "fr": "Améliorations"},
"quality_score": {"en": "Quality Score", "fr": "Score de qualité"},
"missing_outputs": {"en": "Missing Outputs", "fr": "Sorties manquantes"},
"met_criteria": {"en": "Met Criteria", "fr": "Critères respectés"},
"unmet_criteria": {"en": "Unmet Criteria", "fr": "Critères non respectés"},
"confidence": {"en": "Confidence", "fr": "Confiance"},
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
},
)
class TaskPlan(BaseModel, ModelMixin):
overview: str
tasks: list[TaskStep]
userMessage: Optional[str] = Field(
None, description="Overall user-friendly message for the task plan"
)
register_model_labels(
"TaskPlan",
{"en": "Task Plan", "fr": "Plan de tâches"},
{
"overview": {"en": "Overview", "fr": "Aperçu"},
"tasks": {"en": "Tasks", "fr": "Tâches"},
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
},
)