399 lines
17 KiB
Python
399 lines
17 KiB
Python
"""Workflow-related base datamodels and step/task structures."""
|
|
|
|
from typing import List, Dict, Any, Optional
|
|
from pydantic import BaseModel, Field
|
|
from modules.shared.attributeUtils import register_model_labels, ModelMixin
|
|
|
|
|
|
class ActionDocument(BaseModel, ModelMixin):
|
|
"""Clear document structure for action results"""
|
|
documentName: str = Field(description="Name of the document")
|
|
documentData: Any = Field(description="Content/data of the document")
|
|
mimeType: str = Field(description="MIME type of the document")
|
|
register_model_labels(
|
|
"ActionDocument",
|
|
{"en": "Action Document", "fr": "Document d'action"},
|
|
{
|
|
"documentName": {"en": "Document Name", "fr": "Nom du document"},
|
|
"documentData": {"en": "Document Data", "fr": "Données du document"},
|
|
"mimeType": {"en": "MIME Type", "fr": "Type MIME"},
|
|
},
|
|
)
|
|
|
|
|
|
class ActionResult(BaseModel, ModelMixin):
|
|
"""Clean action result with documents as primary output
|
|
|
|
IMPORTANT: Action methods should NOT set resultLabel in their return value.
|
|
The resultLabel is managed by the action handler using the action's execResultLabel
|
|
from the action plan. This ensures consistent document routing throughout the workflow.
|
|
"""
|
|
|
|
success: bool = Field(description="Whether execution succeeded")
|
|
error: Optional[str] = Field(None, description="Error message if failed")
|
|
documents: List[ActionDocument] = Field(default_factory=list, description="Document outputs")
|
|
resultLabel: Optional[str] = Field(None, description="Label for document routing (set by action handler, not by action methods)")
|
|
|
|
@classmethod
|
|
def isSuccess(cls, documents: List[ActionDocument] = None) -> "ActionResult":
|
|
return cls(success=True, documents=documents or [])
|
|
|
|
@classmethod
|
|
def isFailure(cls, error: str, documents: List[ActionDocument] = None) -> "ActionResult":
|
|
return cls(success=False, documents=documents or [], error=error)
|
|
register_model_labels(
|
|
"ActionResult",
|
|
{"en": "Action Result", "fr": "Résultat de l'action"},
|
|
{
|
|
"success": {"en": "Success", "fr": "Succès"},
|
|
"error": {"en": "Error", "fr": "Erreur"},
|
|
"documents": {"en": "Documents", "fr": "Documents"},
|
|
"resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"},
|
|
},
|
|
)
|
|
|
|
|
|
class ActionSelection(BaseModel, ModelMixin):
|
|
method: str = Field(description="Method to execute (e.g., web, document, ai)")
|
|
name: str = Field(description="Action name within the method (e.g., search, extract)")
|
|
|
|
|
|
register_model_labels(
|
|
"ActionSelection",
|
|
{"en": "Action Selection", "fr": "Sélection d'action"},
|
|
{
|
|
"method": {"en": "Method", "fr": "Méthode"},
|
|
"name": {"en": "Action Name", "fr": "Nom de l'action"},
|
|
},
|
|
)
|
|
|
|
|
|
class ActionParameters(BaseModel, ModelMixin):
|
|
parameters: Dict[str, Any] = Field(default_factory=dict, description="Parameters to execute the selected action")
|
|
|
|
|
|
register_model_labels(
|
|
"ActionParameters",
|
|
{"en": "Action Parameters", "fr": "Paramètres d'action"},
|
|
{
|
|
"parameters": {"en": "Parameters", "fr": "Paramètres"},
|
|
},
|
|
)
|
|
|
|
|
|
class ObservationPreview(BaseModel, ModelMixin):
|
|
name: str = Field(description="Document name or URL label")
|
|
mime: str = Field(description="MIME type or kind")
|
|
snippet: str = Field(description="Short snippet or summary")
|
|
|
|
|
|
register_model_labels(
|
|
"ObservationPreview",
|
|
{"en": "Observation Preview", "fr": "Aperçu d'observation"},
|
|
{
|
|
"name": {"en": "Name", "fr": "Nom"},
|
|
"mime": {"en": "MIME", "fr": "MIME"},
|
|
"snippet": {"en": "Snippet", "fr": "Extrait"},
|
|
},
|
|
)
|
|
|
|
|
|
class Observation(BaseModel, ModelMixin):
|
|
success: bool = Field(description="Action execution success flag")
|
|
resultLabel: str = Field(description="Deterministic label for produced documents")
|
|
documentsCount: int = Field(description="Number of produced documents")
|
|
previews: List[ObservationPreview] = Field(default_factory=list, description="Compact previews of outputs")
|
|
notes: List[str] = Field(default_factory=list, description="Short notes or key facts")
|
|
|
|
|
|
register_model_labels(
|
|
"Observation",
|
|
{"en": "Observation", "fr": "Observation"},
|
|
{
|
|
"success": {"en": "Success", "fr": "Succès"},
|
|
"resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"},
|
|
"documentsCount": {"en": "Documents Count", "fr": "Nombre de documents"},
|
|
"previews": {"en": "Previews", "fr": "Aperçus"},
|
|
"notes": {"en": "Notes", "fr": "Notes"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskStatus(str):
|
|
PENDING = "pending"
|
|
RUNNING = "running"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
CANCELLED = "cancelled"
|
|
|
|
|
|
register_model_labels(
|
|
"TaskStatus",
|
|
{"en": "Task Status", "fr": "Statut de la tâche"},
|
|
{
|
|
"PENDING": {"en": "Pending", "fr": "En attente"},
|
|
"RUNNING": {"en": "Running", "fr": "En cours"},
|
|
"COMPLETED": {"en": "Completed", "fr": "Terminé"},
|
|
"FAILED": {"en": "Failed", "fr": "Échec"},
|
|
"CANCELLED": {"en": "Cancelled", "fr": "Annulé"},
|
|
},
|
|
)
|
|
|
|
|
|
class DocumentExchange(BaseModel, ModelMixin):
|
|
documentsLabel: str = Field(description="Label for the set of documents")
|
|
documents: List[str] = Field(default_factory=list, description="List of document references")
|
|
|
|
|
|
register_model_labels(
|
|
"DocumentExchange",
|
|
{"en": "Document Exchange", "fr": "Échange de documents"},
|
|
{
|
|
"documentsLabel": {"en": "Documents Label", "fr": "Label des documents"},
|
|
"documents": {"en": "Documents", "fr": "Documents"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskAction(BaseModel, ModelMixin):
|
|
id: str = Field(..., description="Action ID")
|
|
execMethod: str = Field(..., description="Method to execute")
|
|
execAction: str = Field(..., description="Action to perform")
|
|
execParameters: Dict[str, Any] = Field(default_factory=dict, description="Action parameters")
|
|
execResultLabel: Optional[str] = Field(None, description="Label for the set of result documents")
|
|
expectedDocumentFormats: Optional[List[Dict[str, str]]] = Field(None, description="Expected document formats (optional)")
|
|
userMessage: Optional[str] = Field(None, description="User-friendly message in user's language")
|
|
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Action status")
|
|
error: Optional[str] = Field(None, description="Error message if action failed")
|
|
retryCount: int = Field(default=0, description="Number of retries attempted")
|
|
retryMax: int = Field(default=3, description="Maximum number of retries")
|
|
processingTime: Optional[float] = Field(None, description="Processing time in seconds")
|
|
timestamp: float = Field(..., description="When the action was executed (UTC timestamp in seconds)")
|
|
result: Optional[str] = Field(None, description="Result of the action")
|
|
|
|
|
|
register_model_labels(
|
|
"TaskAction",
|
|
{"en": "Task Action", "fr": "Action de tâche"},
|
|
{
|
|
"id": {"en": "Action ID", "fr": "ID de l'action"},
|
|
"execMethod": {"en": "Method", "fr": "Méthode"},
|
|
"execAction": {"en": "Action", "fr": "Action"},
|
|
"execParameters": {"en": "Parameters", "fr": "Paramètres"},
|
|
"execResultLabel": {"en": "Result Label", "fr": "Label du résultat"},
|
|
"expectedDocumentFormats": {"en": "Expected Document Formats", "fr": "Formats de documents attendus"},
|
|
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
|
|
"status": {"en": "Status", "fr": "Statut"},
|
|
"error": {"en": "Error", "fr": "Erreur"},
|
|
"retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
|
|
"retryMax": {"en": "Max Retries", "fr": "Tentatives max"},
|
|
"processingTime": {"en": "Processing Time", "fr": "Temps de traitement"},
|
|
"timestamp": {"en": "Timestamp", "fr": "Horodatage"},
|
|
"result": {"en": "Result", "fr": "Résultat"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskResult(BaseModel, ModelMixin):
|
|
taskId: str = Field(..., description="Task ID")
|
|
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status")
|
|
success: bool = Field(..., description="Whether the task was successful")
|
|
feedback: Optional[str] = Field(None, description="Task feedback message")
|
|
error: Optional[str] = Field(None, description="Error message if task failed")
|
|
|
|
|
|
register_model_labels(
|
|
"TaskResult",
|
|
{"en": "Task Result", "fr": "Résultat de tâche"},
|
|
{
|
|
"taskId": {"en": "Task ID", "fr": "ID de la tâche"},
|
|
"status": {"en": "Status", "fr": "Statut"},
|
|
"success": {"en": "Success", "fr": "Succès"},
|
|
"feedback": {"en": "Feedback", "fr": "Retour"},
|
|
"error": {"en": "Error", "fr": "Erreur"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskItem(BaseModel, ModelMixin):
|
|
id: str = Field(..., description="Task ID")
|
|
workflowId: str = Field(..., description="Workflow ID")
|
|
userInput: str = Field(..., description="User input that triggered the task")
|
|
status: TaskStatus = Field(default=TaskStatus.PENDING, description="Task status")
|
|
error: Optional[str] = Field(None, description="Error message if task failed")
|
|
startedAt: Optional[float] = Field(None, description="When the task started (UTC timestamp in seconds)")
|
|
finishedAt: Optional[float] = Field(None, description="When the task finished (UTC timestamp in seconds)")
|
|
actionList: List[TaskAction] = Field(default_factory=list, description="List of actions to execute")
|
|
retryCount: int = Field(default=0, description="Number of retries attempted")
|
|
retryMax: int = Field(default=3, description="Maximum number of retries")
|
|
rollbackOnFailure: bool = Field(default=True, description="Whether to rollback on failure")
|
|
dependencies: List[str] = Field(default_factory=list, description="List of task IDs this task depends on")
|
|
feedback: Optional[str] = Field(None, description="Task feedback message")
|
|
processingTime: Optional[float] = Field(None, description="Total processing time in seconds")
|
|
resultLabels: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Map of result labels to their values")
|
|
|
|
|
|
register_model_labels(
|
|
"TaskItem",
|
|
{"en": "Task", "fr": "Tâche"},
|
|
{
|
|
"id": {"en": "Task ID", "fr": "ID de la tâche"},
|
|
"workflowId": {"en": "Workflow ID", "fr": "ID du workflow"},
|
|
"userInput": {"en": "User Input", "fr": "Entrée utilisateur"},
|
|
"status": {"en": "Status", "fr": "Statut"},
|
|
"error": {"en": "Error", "fr": "Erreur"},
|
|
"startedAt": {"en": "Started At", "fr": "Démarré à"},
|
|
"finishedAt": {"en": "Finished At", "fr": "Terminé à"},
|
|
"actionList": {"en": "Actions", "fr": "Actions"},
|
|
"retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"},
|
|
"retryMax": {"en": "Max Retries", "fr": "Tentatives max"},
|
|
"processingTime": {"en": "Processing Time", "fr": "Temps de traitement"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskStep(BaseModel, ModelMixin):
|
|
id: str
|
|
objective: str
|
|
dependencies: Optional[list[str]] = Field(default_factory=list)
|
|
success_criteria: Optional[list[str]] = Field(default_factory=list)
|
|
estimated_complexity: Optional[str] = None
|
|
userMessage: Optional[str] = Field(None, description="User-friendly message in user's language")
|
|
|
|
|
|
register_model_labels(
|
|
"TaskStep",
|
|
{"en": "Task Step", "fr": "Étape de tâche"},
|
|
{
|
|
"id": {"en": "ID", "fr": "ID"},
|
|
"objective": {"en": "Objective", "fr": "Objectif"},
|
|
"dependencies": {"en": "Dependencies", "fr": "Dépendances"},
|
|
"success_criteria": {"en": "Success Criteria", "fr": "Critères de succès"},
|
|
"estimated_complexity": {"en": "Estimated Complexity", "fr": "Complexité estimée"},
|
|
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskHandover(BaseModel, ModelMixin):
|
|
taskId: str = Field(description="Target task ID")
|
|
sourceTask: Optional[str] = Field(None, description="Source task ID")
|
|
inputDocuments: List[DocumentExchange] = Field(default_factory=list, description="Available input documents")
|
|
outputDocuments: List[DocumentExchange] = Field(default_factory=list, description="Produced output documents")
|
|
context: Dict[str, Any] = Field(default_factory=dict, description="Task context")
|
|
previousResults: List[str] = Field(default_factory=list, description="Previous result summaries")
|
|
improvements: List[str] = Field(default_factory=list, description="Improvement suggestions")
|
|
workflowSummary: Optional[str] = Field(None, description="Summarized workflow context")
|
|
messageHistory: List[str] = Field(default_factory=list, description="Key message summaries")
|
|
timestamp: float = Field(..., description="When the handover was created (UTC timestamp in seconds)")
|
|
handoverType: str = Field(default="task", description="Type of handover: task, phase, or workflow")
|
|
|
|
|
|
register_model_labels(
|
|
"TaskHandover",
|
|
{"en": "Task Handover", "fr": "Transfert de tâche"},
|
|
{
|
|
"taskId": {"en": "Task ID", "fr": "ID de la tâche"},
|
|
"sourceTask": {"en": "Source Task", "fr": "Tâche source"},
|
|
"inputDocuments": {"en": "Input Documents", "fr": "Documents d'entrée"},
|
|
"outputDocuments": {"en": "Output Documents", "fr": "Documents de sortie"},
|
|
"context": {"en": "Context", "fr": "Contexte"},
|
|
"previousResults": {"en": "Previous Results", "fr": "Résultats précédents"},
|
|
"improvements": {"en": "Improvements", "fr": "Améliorations"},
|
|
"workflowSummary": {"en": "Workflow Summary", "fr": "Résumé du workflow"},
|
|
"messageHistory": {"en": "Message History", "fr": "Historique des messages"},
|
|
"timestamp": {"en": "Timestamp", "fr": "Horodatage"},
|
|
"handoverType": {"en": "Handover Type", "fr": "Type de transfert"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskContext(BaseModel, ModelMixin):
|
|
task_step: TaskStep
|
|
workflow: Optional['ChatWorkflow'] = None
|
|
workflow_id: Optional[str] = None
|
|
available_documents: Optional[str] = "No documents available"
|
|
available_connections: Optional[list[str]] = Field(default_factory=list)
|
|
previous_results: Optional[list[str]] = Field(default_factory=list)
|
|
previous_handover: Optional[TaskHandover] = None
|
|
improvements: Optional[list[str]] = Field(default_factory=list)
|
|
retry_count: Optional[int] = 0
|
|
previous_action_results: Optional[list] = Field(default_factory=list)
|
|
previous_review_result: Optional[dict] = None
|
|
is_regeneration: Optional[bool] = False
|
|
failure_patterns: Optional[list[str]] = Field(default_factory=list)
|
|
failed_actions: Optional[list] = Field(default_factory=list)
|
|
successful_actions: Optional[list] = Field(default_factory=list)
|
|
criteria_progress: Optional[dict] = None
|
|
|
|
def getDocumentReferences(self) -> List[str]:
|
|
docs = []
|
|
if self.previous_handover:
|
|
for doc_exchange in self.previous_handover.inputDocuments:
|
|
docs.extend(doc_exchange.documents)
|
|
return list(set(docs))
|
|
|
|
def addImprovement(self, improvement: str) -> None:
|
|
if improvement not in (self.improvements or []):
|
|
if self.improvements is None:
|
|
self.improvements = []
|
|
self.improvements.append(improvement)
|
|
|
|
|
|
class ReviewContext(BaseModel, ModelMixin):
|
|
task_step: TaskStep
|
|
task_actions: Optional[list] = Field(default_factory=list)
|
|
action_results: Optional[list] = Field(default_factory=list)
|
|
step_result: Optional[dict] = Field(default_factory=dict)
|
|
workflow_id: Optional[str] = None
|
|
previous_results: Optional[list[str]] = Field(default_factory=list)
|
|
|
|
|
|
class ReviewResult(BaseModel, ModelMixin):
|
|
status: str
|
|
reason: Optional[str] = None
|
|
improvements: Optional[list[str]] = Field(default_factory=list)
|
|
quality_score: Optional[int] = 5
|
|
missing_outputs: Optional[list[str]] = Field(default_factory=list)
|
|
met_criteria: Optional[list[str]] = Field(default_factory=list)
|
|
unmet_criteria: Optional[list[str]] = Field(default_factory=list)
|
|
confidence: Optional[float] = 0.5
|
|
userMessage: Optional[str] = Field(None, description="User-friendly message in user's language")
|
|
|
|
|
|
register_model_labels(
|
|
"ReviewResult",
|
|
{"en": "Review Result", "fr": "Résultat de l'évaluation"},
|
|
{
|
|
"status": {"en": "Status", "fr": "Statut"},
|
|
"reason": {"en": "Reason", "fr": "Raison"},
|
|
"improvements": {"en": "Improvements", "fr": "Améliorations"},
|
|
"quality_score": {"en": "Quality Score", "fr": "Score de qualité"},
|
|
"missing_outputs": {"en": "Missing Outputs", "fr": "Sorties manquantes"},
|
|
"met_criteria": {"en": "Met Criteria", "fr": "Critères respectés"},
|
|
"unmet_criteria": {"en": "Unmet Criteria", "fr": "Critères non respectés"},
|
|
"confidence": {"en": "Confidence", "fr": "Confiance"},
|
|
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
|
|
},
|
|
)
|
|
|
|
|
|
class TaskPlan(BaseModel, ModelMixin):
|
|
overview: str
|
|
tasks: list[TaskStep]
|
|
userMessage: Optional[str] = Field(None, description="Overall user-friendly message for the task plan")
|
|
|
|
|
|
register_model_labels(
|
|
"TaskPlan",
|
|
{"en": "Task Plan", "fr": "Plan de tâches"},
|
|
{
|
|
"overview": {"en": "Overview", "fr": "Aperçu"},
|
|
"tasks": {"en": "Tasks", "fr": "Tâches"},
|
|
"userMessage": {"en": "User Message", "fr": "Message utilisateur"},
|
|
},
|
|
)
|
|
|
|
|
|
|
|
|