From e25903dacae68dd3ca4272a1f0540b90f5bb2984 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Wed, 11 Jun 2025 00:38:26 +0200 Subject: [PATCH] cleanup chat part --- modules/interfaces/serviceAppAccess.py | 2 +- modules/interfaces/serviceChatAccess.py | 21 +- modules/interfaces/serviceChatClass.py | 185 +--- modules/interfaces/serviceChatModel.py | 242 +++-- modules/interfaces/serviceManagementClass.py | 41 - modules/routes/routeWorkflows.py | 61 +- modules/workflow/managerChat.py | 937 ++++++++++++------- modules/workflow/managerDocument.py | 2 - modules/workflow/managerWorkflow.py | 193 ++-- modules/workflow/serviceContainer.py | 11 +- notes/changelog.txt | 7 +- 11 files changed, 860 insertions(+), 842 deletions(-) diff --git a/modules/interfaces/serviceAppAccess.py b/modules/interfaces/serviceAppAccess.py index a30fa015..9ecac84c 100644 --- a/modules/interfaces/serviceAppAccess.py +++ b/modules/interfaces/serviceAppAccess.py @@ -145,7 +145,7 @@ class AppAccess: return filtered_records - def canModify(self, table: str, recordId: Optional[int] = None) -> bool: + def canModify(self, table: str, recordId: Optional[str] = None) -> bool: """ Checks if the current user can modify (create/update/delete) records in a table. diff --git a/modules/interfaces/serviceChatAccess.py b/modules/interfaces/serviceChatAccess.py index 119da202..12e37814 100644 --- a/modules/interfaces/serviceChatAccess.py +++ b/modules/interfaces/serviceChatAccess.py @@ -45,29 +45,16 @@ class ChatAccess: # Admins see records in their mandate filtered_records = [r for r in recordset if r.get("mandateId","-") == self.mandateId] else: # Regular users - # For prompts, users can see all prompts from their mandate - if table == "prompts": - filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId] - else: - # Users see only their records for other tables - filtered_records = [r for r in recordset - if r.get("mandateId","-") == self.mandateId and r.get("_createdBy") == self.userId] + # Users see only their records for other tables + filtered_records = [r for r in recordset + if r.get("mandateId","-") == self.mandateId and r.get("_createdBy") == self.userId] # Add access control attributes to each record for record in filtered_records: record_id = record.get("id") # Set access control flags based on user permissions - if table == "prompts": - record["_hideView"] = False # Everyone can view - record["_hideEdit"] = not self.canModify("prompts", record_id) - record["_hideDelete"] = not self.canModify("prompts", record_id) - elif table == "files": - record["_hideView"] = False # Everyone can view - record["_hideEdit"] = not self.canModify("files", record_id) - record["_hideDelete"] = not self.canModify("files", record_id) - record["_hideDownload"] = not self.canModify("files", record_id) - elif table == "workflows": + if table == "workflows": record["_hideView"] = False # Everyone can view record["_hideEdit"] = not self.canModify("workflows", record_id) record["_hideDelete"] = not self.canModify("workflows", record_id) diff --git a/modules/interfaces/serviceChatClass.py b/modules/interfaces/serviceChatClass.py index b14ee795..0b80150e 100644 --- a/modules/interfaces/serviceChatClass.py +++ b/modules/interfaces/serviceChatClass.py @@ -14,22 +14,20 @@ import asyncio from modules.interfaces.serviceChatAccess import ChatAccess from modules.interfaces.serviceChatModel import ( - ChatContent, ChatDocument, ChatStat, ChatMessage, - ChatLog, ChatWorkflow, Agent, AgentResponse, - Task, TaskPlan, UserInputRequest + TaskStatus, UserInputRequest, ContentMetadata, ContentItem, + ChatDocument, TaskDocument, ExtractedContent, TaskItem, + TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) from modules.interfaces.serviceAppModel import User -from modules.workflow.managerDocument import DocumentManager # DYNAMIC PART: Connectors to the Interface from modules.connectors.connectorDbJson import DatabaseConnector -from modules.connectors.connectorAiOpenai import ChatService # Basic Configurations from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) -# Singleton factory for Chat instances with AI service per context +# Singleton factory for Chat instances _chatInterfaces = {} class ChatInterface: @@ -57,15 +55,10 @@ class ChatInterface: self.setUserContext(currentUser) def _initializeServices(self): - """Initialize service dependencies""" - self.documentManager = DocumentManager(self.service) + pass def setUserContext(self, currentUser: User): """Sets the user context for the interface.""" - if not currentUser: - logger.info("Initializing interface without user context") - return - self.currentUser = currentUser # Store User object directly self.userId = currentUser.id self.mandateId = currentUser.mandateId @@ -110,59 +103,7 @@ class ChatInterface: def _initRecords(self): """Initializes standard records in the database if they don't exist.""" - try: - # Initialize standard prompts - self._initializeStandardPrompts() - - # Add other record initializations here - - logger.info("Standard records initialized successfully") - except Exception as e: - logger.error(f"Failed to initialize standard records: {str(e)}") - raise - - def _initializeStandardPrompts(self): - """Creates standard prompts if they don't exist.""" - prompts = self.db.getRecordset("prompts") - logger.debug(f"Found {len(prompts)} existing prompts") - - if not prompts: - logger.debug("Creating standard prompts") - - # Define standard prompts - standardPrompts = [ - { - "content": "Research the current market trends and developments in [TOPIC]. Collect information about leading companies, innovative products or services, and current challenges. Present the results in a structured overview with relevant data and sources.", - "name": "Web Research: Market Research" - }, - { - "content": "Analyze the attached dataset on [TOPIC] and identify the most important trends, patterns, and anomalies. Perform statistical calculations to support your findings. Present the results in a clearly structured analysis and draw relevant conclusions.", - "name": "Analysis: Data Analysis" - }, - { - "content": "Create a detailed protocol of our meeting on [TOPIC]. Capture all discussed points, decisions made, and agreed measures. Structure the protocol clearly with agenda items, participant list, and clear responsibilities for follow-up actions.", - "name": "Protocol: Meeting Minutes" - }, - { - "content": "Develop a UI/UX design concept for [APPLICATION/WEBSITE]. Consider the target audience, main functions, and brand identity. Describe the visual design, navigation, interaction patterns, and information architecture. Explain how the design optimizes user-friendliness and user experience.", - "name": "Design: UI/UX Design" - }, - { - "content": "Gib mir die ersten 1000 Primzahlen", - "name": "Code: Primzahlen" - }, - { - "content": "Bereite mir eine formelle E-Mail an peter.muster@domain.com vor, um meinen Termin von 10 Uhr auf Freitag zu scheiben.", - "name": "Mail: Vorbereitung" - }, - ] - - # Create prompts - for promptData in standardPrompts: - createdPrompt = self.db.recordCreate("prompts", promptData) - logger.debug(f"Prompt '{promptData.get('name', 'Standard')}' was created with ID {createdPrompt['id']} and context mandate={createdPrompt.get('mandateId')}, user={createdPrompt.get('_createdBy')}") - else: - logger.debug("Prompts already exist, skipping creation") + pass def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Delegate to access control module.""" @@ -182,47 +123,6 @@ class ChatInterface: """Delegate to access control module.""" return self.access.canModify(table, recordId) - # Language support method - - def setUserLanguage(self, languageCode: str): - """Set the user's preferred language""" - self.userLanguage = languageCode - logger.debug(f"User language set to: {languageCode}") - - # AI Call Root Function - - async def callAi(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str: - """Enhanced AI service call with language support.""" - if not self.aiService: - logger.error("AI service not set in LucydomInterface") - return "Error: AI service not available" - - # Add language instruction for user-facing responses - if produceUserAnswer and self.userLanguage: - ltext= f"Please respond in '{self.userLanguage}' language." - if messages and messages[0]["role"] == "system": - if "language" not in messages[0]["content"].lower(): - messages[0]["content"] = f"{ltext} {messages[0]['content']}" - else: - # Insert a system message with language instruction - messages.insert(0, { - "role": "system", - "content": ltext - }) - - # Call the AI service - if temperature is not None: - return await self.aiService.callApi(messages, temperature=temperature) - else: - return await self.aiService.callApi(messages) - - async def callAi4Image(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: - """Enhanced AI service call with language support.""" - if not self.aiService: - logger.error("AI service not set in LucydomInterface") - return "Error: AI service not available" - return await self.aiService.analyzeImage(imageData, mimeType, prompt) - # Utilities def getInitialId(self, table: str) -> Optional[str]: @@ -837,7 +737,7 @@ class ChatInterface: # Workflow Actions - async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: + async def workflowStart(self, currentUser: User, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: """ Starts a new workflow or continues an existing one. @@ -852,35 +752,12 @@ class ChatInterface: # Get current timestamp currentTime = self._getCurrentTimestamp() - # Process files if any - documents = [] - if userInput.listFileId: - documents = await self._processFileIds(userInput.listFileId) - - # Create initial message - initialMessage = ChatMessage( - id=str(uuid.uuid4()), - role="user", - content=userInput.prompt, - timestamp=currentTime, - documents=documents - ) - if workflowId: # Continue existing workflow workflow = self.getWorkflow(workflowId) if not workflow: raise ValueError(f"Workflow {workflowId} not found") - # Add message to workflow - self.createWorkflowMessage({ - "workflowId": workflowId, - "messageId": initialMessage.id, - "role": initialMessage.role, - "content": initialMessage.content, - "timestamp": initialMessage.timestamp, - "documents": [doc.dict() for doc in initialMessage.documents] - }) # Update workflow self.updateWorkflow(workflowId, { @@ -897,10 +774,10 @@ class ChatInterface: "lastActivity": currentTime, "currentRound": 1, "mandateId": self.mandateId, - "messageIds": [initialMessage.id], + "messageIds": [], "dataStats": { - "totalMessages": 1, - "totalDocuments": len(documents), + "totalMessages": 0, + "totalDocuments": 0, "totalTokens": 0 } } @@ -908,16 +785,6 @@ class ChatInterface: # Create workflow workflow = self.createWorkflow(workflowData) - # Add initial message - self.createWorkflowMessage({ - "workflowId": workflow.id, - "messageId": initialMessage.id, - "role": initialMessage.role, - "content": initialMessage.content, - "timestamp": initialMessage.timestamp, - "documents": [doc.dict() for doc in initialMessage.documents] - }) - # Add log entry self.createWorkflowLog({ "workflowId": workflow.id, @@ -929,7 +796,7 @@ class ChatInterface: # Start workflow processing from modules.workflow.managerWorkflow import WorkflowManager - workflowManager = WorkflowManager(self) + workflowManager = WorkflowManager(self, currentUser) asyncio.create_task(workflowManager.workflowProcess(userInput, workflow)) return workflow @@ -979,36 +846,6 @@ class ChatInterface: logger.error(f"Error stopping workflow: {str(e)}") raise - async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: - """ - Process multiple files and extract their contents. - - Args: - fileIds: List of file IDs to process - - Returns: - List of ChatDocument objects - """ - documents = [] - for fileId in fileIds: - # Get file metadata - fileMetadata = self.service.functions.getFile(fileId) - if not fileMetadata: - logger.warning(f"File metadata not found for {fileId}") - continue - - # Create ChatDocument - document = ChatDocument( - id=str(uuid.uuid4()), - fileId=fileId, - filename=fileMetadata.get("name", "Unknown"), - fileSize=fileMetadata.get("size", 0), - mimeType=fileMetadata.get("mimeType", "text/plain") - ) - - documents.append(document) - return documents - def getInterface(currentUser: Optional[User] = None) -> 'ChatInterface': """ diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py index fbf4ef30..1d65d26e 100644 --- a/modules/interfaces/serviceChatModel.py +++ b/modules/interfaces/serviceChatModel.py @@ -10,8 +10,7 @@ from enum import Enum from modules.shared.attributeUtils import register_model_labels, ModelMixin - -# ENUMS +# ===== Base Enums and Simple Models ===== class TaskStatus(str, Enum): """Task status enumeration""" @@ -36,13 +35,12 @@ register_model_labels( } ) -# USER MODELS - class UserInputRequest(BaseModel, ModelMixin): """Data model for a user input request""" prompt: str = Field(description="Prompt for the user") listFileId: List[str] = Field(default_factory=list, description="List of file IDs") userLanguage: str = Field(default="en", description="User's preferred language") + # Register labels for UserInputRequest register_model_labels( "UserInputRequest", @@ -54,14 +52,13 @@ register_model_labels( } ) -# DOCUMENT MODELS +# ===== Content Models ===== class ContentMetadata(BaseModel, ModelMixin): """Metadata for content items""" size: int = Field(description="Content size in bytes") pages: Optional[int] = Field(None, description="Number of pages for multi-page content") error: Optional[str] = Field(None, description="Processing error if any") - # Media-specific attributes width: Optional[int] = Field(None, description="Width in pixels for images/videos") height: Optional[int] = Field(None, description="Height in pixels for images/videos") colorMode: Optional[str] = Field(None, description="Color mode (e.g., RGB, CMYK, grayscale)") @@ -162,96 +159,10 @@ register_model_labels( } ) -# WORKFLOW MODELS +# ===== Task Models ===== -class ChatStat(BaseModel, ModelMixin): - """Data model for chat statistics""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - processingTime: Optional[float] = Field(None, description="Processing time in seconds") - tokenCount: Optional[int] = Field(None, description="Number of tokens processed") - bytesSent: Optional[int] = Field(None, description="Number of bytes sent") - bytesReceived: Optional[int] = Field(None, description="Number of bytes received") - successRate: Optional[float] = Field(None, description="Success rate of operations") - errorCount: Optional[int] = Field(None, description="Number of errors encountered") -# Register labels for ChatStat -register_model_labels( - "ChatStat", - {"en": "Chat Statistics", "fr": "Statistiques de chat"}, - { - "id": {"en": "ID", "fr": "ID"}, - "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"}, - "tokenCount": {"en": "Token Count", "fr": "Nombre de tokens"}, - "bytesSent": {"en": "Bytes Sent", "fr": "Octets envoyés"}, - "bytesReceived": {"en": "Bytes Received", "fr": "Octets reçus"}, - "successRate": {"en": "Success Rate", "fr": "Taux de succès"}, - "errorCount": {"en": "Error Count", "fr": "Nombre d'erreurs"} - } -) - -class ChatLog(BaseModel, ModelMixin): - """Data model for a chat log""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - workflowId: str = Field(description="Foreign key to workflow") - message: str = Field(description="Log message") - type: str = Field(description="Type of log entry") - timestamp: str = Field(description="Timestamp of the log entry") - agentName: str = Field(description="Name of the agent") - status: str = Field(description="Status of the log entry") - progress: Optional[int] = Field(None, description="Progress percentage") - performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics") -# Register labels for ChatLog -register_model_labels( - "ChatLog", - {"en": "Chat Log", "fr": "Journal de chat"}, - { - "id": {"en": "ID", "fr": "ID"}, - "workflowId": {"en": "Workflow ID", "fr": "ID du flux de travail"}, - "message": {"en": "Message", "fr": "Message"}, - "type": {"en": "Type", "fr": "Type"}, - "timestamp": {"en": "Timestamp", "fr": "Horodatage"}, - "agentName": {"en": "Agent Name", "fr": "Nom de l'agent"}, - "status": {"en": "Status", "fr": "Statut"}, - "progress": {"en": "Progress", "fr": "Progression"}, - "performance": {"en": "Performance", "fr": "Performance"} - } -) - -class ChatMessage(BaseModel, ModelMixin): - """Data model for a chat message""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - workflowId: str = Field(description="Foreign key to workflow") - parentMessageId: Optional[str] = Field(None, description="Parent message ID for threading") - agentName: Optional[str] = Field(None, description="Name of the agent") - documents: List[ChatDocument] = Field(default_factory=list, description="Associated documents") - message: Optional[str] = Field(None, description="Message content") - role: str = Field(description="Role of the message sender") - status: str = Field(description="Status of the message (first, step, last)") - sequenceNr: int = Field(description="Sequence number of the message (set automatically)") - publishedAt: str = Field(description="When the message was published") - stats: Optional[ChatStat] = Field(None, description="Statistics for this message") - success: Optional[bool] = Field(None, description="Whether the message processing was successful") -# Register labels for ChatMessage -register_model_labels( - "ChatMessage", - {"en": "Chat Message", "fr": "Message de chat"}, - { - "id": {"en": "ID", "fr": "ID"}, - "workflowId": {"en": "Workflow ID", "fr": "ID du flux de travail"}, - "parentMessageId": {"en": "Parent Message ID", "fr": "ID du message parent"}, - "agentName": {"en": "Agent Name", "fr": "Nom de l'agent"}, - "documents": {"en": "Documents", "fr": "Documents"}, - "message": {"en": "Message", "fr": "Message"}, - "role": {"en": "Role", "fr": "Rôle"}, - "status": {"en": "Status", "fr": "Statut"}, - "sequenceNr": {"en": "Sequence Number", "fr": "Numéro de séquence"}, - "publishedAt": {"en": "Published At", "fr": "Publié le"}, - "stats": {"en": "Statistics", "fr": "Statistiques"}, - "success": {"en": "Success", "fr": "Succès"} - } -) - -class AgentTask(BaseModel, ModelMixin): - """Model for agent tasks""" +class TaskItem(BaseModel, ModelMixin): + """Model for tasks""" id: str = Field(..., description="Unique task identifier") workflowId: str = Field(..., description="Associated workflow ID") status: TaskStatus = Field(default=TaskStatus.PENDING, description="Current task status") @@ -264,7 +175,7 @@ class AgentTask(BaseModel, ModelMixin): retryMax: int = Field(default=3, description="Maximum number of retry attempts") rollbackOnFailure: bool = Field(default=True, description="Whether to rollback on failure") dependencies: List[str] = Field(default_factory=list, description="List of dependent task IDs") - thisTaskFeedback: Optional[Dict[str, Any]] = Field(None, description="Task feedback data") + feedback: Optional[Dict[str, Any]] = Field(None, description="Task feedback data") def isCompleted(self) -> bool: """Check if task is completed""" @@ -328,11 +239,11 @@ class AgentTask(BaseModel, ModelMixin): def setFeedback(self, feedback: Dict[str, Any]) -> None: """Set task feedback""" - self.thisTaskFeedback = feedback + self.feedback = feedback -# Register labels for AgentTask +# Register labels for TaskItem register_model_labels( - "AgentTask", + "TaskItem", {"en": "Task", "fr": "Tâche"}, { "id": {"en": "ID", "fr": "ID"}, @@ -347,11 +258,137 @@ register_model_labels( "retryMax": {"en": "Max Retries", "fr": "Tentatives maximales"}, "rollbackOnFailure": {"en": "Rollback on Failure", "fr": "Annulation en cas d'échec"}, "dependencies": {"en": "Dependencies", "fr": "Dépendances"}, - "thisTaskFeedback": {"en": "Task Feedback", "fr": "Retour sur la tâche"} + "feedback": {"en": "Task Feedback", "fr": "Retour sur la tâche"} } ) -# WORKFLOW MODEL +class TaskResult(BaseModel, ModelMixin): + """Model for task execution results""" + taskId: str = Field(..., description="ID of the task this result belongs to") + status: TaskStatus = Field(..., description="Result status") + success: bool = Field(..., description="Whether the task was successful") + error: Optional[str] = Field(None, description="Error message if task failed") + data: Optional[Dict[str, Any]] = Field(None, description="Result data") + documents: List[ChatDocument] = Field(default_factory=list, description="Output documents") + feedback: Optional[str] = Field(None, description="Task feedback message") + processingTime: Optional[float] = Field(None, description="Processing time in seconds") + timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC), description="When the result was created") + + def isSuccessful(self) -> bool: + """Check if result indicates success""" + return self.success and self.status == TaskStatus.COMPLETED + + def hasError(self) -> bool: + """Check if result has an error""" + return not self.success or self.status == TaskStatus.FAILED + + def getErrorMessage(self) -> Optional[str]: + """Get error message if any""" + return self.error if self.hasError() else None + +# Register labels for TaskResult +register_model_labels( + "TaskResult", + {"en": "Task Result", "fr": "Résultat de la tâche"}, + { + "taskId": {"en": "Task ID", "fr": "ID de la tâche"}, + "status": {"en": "Status", "fr": "Statut"}, + "success": {"en": "Success", "fr": "Succès"}, + "error": {"en": "Error", "fr": "Erreur"}, + "data": {"en": "Data", "fr": "Données"}, + "documents": {"en": "Documents", "fr": "Documents"}, + "feedback": {"en": "Feedback", "fr": "Retour"}, + "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"}, + "timestamp": {"en": "Timestamp", "fr": "Horodatage"} + } +) + +# ===== Message and Workflow Models ===== + +class ChatStat(BaseModel, ModelMixin): + """Data model for chat statistics""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") + processingTime: Optional[float] = Field(None, description="Processing time in seconds") + tokenCount: Optional[int] = Field(None, description="Number of tokens processed") + bytesSent: Optional[int] = Field(None, description="Number of bytes sent") + bytesReceived: Optional[int] = Field(None, description="Number of bytes received") + successRate: Optional[float] = Field(None, description="Success rate of operations") + errorCount: Optional[int] = Field(None, description="Number of errors encountered") + +# Register labels for ChatStat +register_model_labels( + "ChatStat", + {"en": "Chat Statistics", "fr": "Statistiques de chat"}, + { + "id": {"en": "ID", "fr": "ID"}, + "processingTime": {"en": "Processing Time", "fr": "Temps de traitement"}, + "tokenCount": {"en": "Token Count", "fr": "Nombre de tokens"}, + "bytesSent": {"en": "Bytes Sent", "fr": "Octets envoyés"}, + "bytesReceived": {"en": "Bytes Received", "fr": "Octets reçus"}, + "successRate": {"en": "Success Rate", "fr": "Taux de succès"}, + "errorCount": {"en": "Error Count", "fr": "Nombre d'erreurs"} + } +) + +class ChatLog(BaseModel, ModelMixin): + """Data model for a chat log""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") + workflowId: str = Field(description="Foreign key to workflow") + message: str = Field(description="Log message") + type: str = Field(description="Type of log entry") + timestamp: str = Field(description="Timestamp of the log entry") + status: str = Field(description="Status of the log entry") + progress: Optional[int] = Field(None, description="Progress percentage") + performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics") + +# Register labels for ChatLog +register_model_labels( + "ChatLog", + {"en": "Chat Log", "fr": "Journal de chat"}, + { + "id": {"en": "ID", "fr": "ID"}, + "workflowId": {"en": "Workflow ID", "fr": "ID du flux de travail"}, + "message": {"en": "Message", "fr": "Message"}, + "type": {"en": "Type", "fr": "Type"}, + "timestamp": {"en": "Timestamp", "fr": "Horodatage"}, + "status": {"en": "Status", "fr": "Statut"}, + "progress": {"en": "Progress", "fr": "Progression"}, + "performance": {"en": "Performance", "fr": "Performance"} + } +) + +class ChatMessage(BaseModel, ModelMixin): + """Data model for a chat message""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") + workflowId: str = Field(description="Foreign key to workflow") + parentMessageId: Optional[str] = Field(None, description="Parent message ID for threading") + documents: List[ChatDocument] = Field(default_factory=list, description="Associated documents") + message: Optional[str] = Field(None, description="Message content") + role: str = Field(description="Role of the message sender") + status: str = Field(description="Status of the message (first, step, last)") + sequenceNr: int = Field(description="Sequence number of the message (set automatically)") + publishedAt: str = Field(description="When the message was published") + stats: Optional[ChatStat] = Field(None, description="Statistics for this message") + success: Optional[bool] = Field(None, description="Whether the message processing was successful") + +# Register labels for ChatMessage +register_model_labels( + "ChatMessage", + {"en": "Chat Message", "fr": "Message de chat"}, + { + "id": {"en": "ID", "fr": "ID"}, + "workflowId": {"en": "Workflow ID", "fr": "ID du flux de travail"}, + "parentMessageId": {"en": "Parent Message ID", "fr": "ID du message parent"}, + "documents": {"en": "Documents", "fr": "Documents"}, + "message": {"en": "Message", "fr": "Message"}, + "role": {"en": "Role", "fr": "Rôle"}, + "status": {"en": "Status", "fr": "Statut"}, + "sequenceNr": {"en": "Sequence Number", "fr": "Numéro de séquence"}, + "publishedAt": {"en": "Published At", "fr": "Publié le"}, + "stats": {"en": "Statistics", "fr": "Statistiques"}, + "success": {"en": "Success", "fr": "Succès"} + } +) class ChatWorkflow(BaseModel, ModelMixin): """Data model for a chat workflow""" @@ -365,7 +402,8 @@ class ChatWorkflow(BaseModel, ModelMixin): logs: List[ChatLog] = Field(default_factory=list, description="Workflow logs") messages: List[ChatMessage] = Field(default_factory=list, description="Messages in the workflow") stats: Optional[ChatStat] = Field(None, description="Workflow statistics") - tasks: List[AgentTask] = Field(default_factory=list, description="List of tasks in the workflow") + tasks: List[TaskItem] = Field(default_factory=list, description="List of tasks in the workflow") + # Register labels for ChatWorkflow register_model_labels( "ChatWorkflow", diff --git a/modules/interfaces/serviceManagementClass.py b/modules/interfaces/serviceManagementClass.py index bdead684..bf97d845 100644 --- a/modules/interfaces/serviceManagementClass.py +++ b/modules/interfaces/serviceManagementClass.py @@ -241,47 +241,6 @@ class ServiceManagement: """Delegate to access control module.""" return self.access.canModify(table, recordId) - # Language support method - - def setUserLanguage(self, languageCode: str): - """Set the user's preferred language""" - self.userLanguage = languageCode - logger.debug(f"User language set to: {languageCode}") - - # AI Call Root Function - - async def callAi(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str: - """Enhanced AI service call with language support.""" - if not self.aiService: - logger.error("AI service not set in ServiceManagement") - return "Error: AI service not available" - - # Add language instruction for user-facing responses - if produceUserAnswer and self.userLanguage: - ltext= f"Please respond in '{self.userLanguage}' language." - if messages and messages[0]["role"] == "system": - if "language" not in messages[0]["content"].lower(): - messages[0]["content"] = f"{ltext} {messages[0]['content']}" - else: - # Insert a system message with language instruction - messages.insert(0, { - "role": "system", - "content": ltext - }) - - # Call the AI service - if temperature is not None: - return await self.aiService.callApi(messages, temperature=temperature) - else: - return await self.aiService.callApi(messages) - - async def callAi4Image(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: - """Enhanced AI service call with language support.""" - if not self.aiService: - logger.error("AI service not set in ServiceManagement") - return "Error: AI service not available" - return await self.aiService.analyzeImage(imageData, mimeType, prompt) - # Utilities def getInitialId(self, table: str) -> Optional[str]: diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index 81e6e123..a96ca2b4 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -18,9 +18,6 @@ from modules.security.auth import limiter, getCurrentUser import modules.interfaces.serviceChatClass as serviceChatClass from modules.interfaces.serviceChatClass import getInterface -# Import workflow manager -from modules.workflow.workflowManager import getWorkflowManager - # Import models from modules.interfaces.serviceChatModel import ( ChatWorkflow, @@ -46,18 +43,8 @@ router = APIRouter( responses={404: {"description": "Not found"}} ) -def createServiceContainer(currentUser: Dict[str, Any]): - """Create a service container with all required interfaces.""" - # Get all interfaces - chatInterface = serviceChatClass.getInterface(currentUser) - - # Create service container - service = type('ServiceContainer', (), { - 'user': currentUser, - 'functions': chatInterface - }) - - return service +def getServiceChat(currentUser: User): + return serviceChatClass.getInterface(currentUser) # Consolidated endpoint for getting all workflows @router.get("/", response_model=List[ChatWorkflow]) @@ -117,10 +104,10 @@ async def get_workflow_status( """Get the current status of a workflow.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Retrieve workflow - workflow = service.base.getWorkflow(workflowId) + workflow = serviceChat.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -149,10 +136,10 @@ async def get_workflow_logs( """Get logs for a workflow with support for selective data transfer.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Verify workflow exists - workflow = service.base.getWorkflow(workflowId) + workflow = serviceChat.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -160,7 +147,7 @@ async def get_workflow_logs( ) # Get all logs - allLogs = service.base.getWorkflowLogs(workflowId) + allLogs = serviceChat.getWorkflowLogs(workflowId) # Apply selective data transfer if logId is provided if logId: @@ -192,10 +179,10 @@ async def get_workflow_messages( """Get messages for a workflow with support for selective data transfer.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Verify workflow exists - workflow = service.base.getWorkflow(workflowId) + workflow = serviceChat.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -203,7 +190,7 @@ async def get_workflow_messages( ) # Get all messages - allMessages = service.base.getWorkflowMessages(workflowId) + allMessages = serviceChat.getWorkflowMessages(workflowId) # Apply selective data transfer if messageId is provided if messageId: @@ -238,10 +225,10 @@ async def start_workflow( """ try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Start or continue workflow using ChatInterface - workflow = await service.functions.workflowStart(userInput, workflowId) + workflow = await serviceChat.workflowStart(currentUser, userInput, workflowId) return ChatWorkflow(**workflow) @@ -263,10 +250,10 @@ async def stop_workflow( """Stops a running workflow.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Stop workflow using ChatInterface - workflow = await service.functions.workflowStop(workflowId) + workflow = await serviceChat.workflowStop(workflowId) return ChatWorkflow(**workflow) @@ -288,10 +275,10 @@ async def delete_workflow( """Deletes a workflow and its associated data.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Verify workflow exists - workflow = service.base.getWorkflow(workflowId) + workflow = serviceChat.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -306,7 +293,7 @@ async def delete_workflow( ) # Delete workflow - success = service.base.deleteWorkflow(workflowId) + success = serviceChat.deleteWorkflow(workflowId) if not success: raise HTTPException( @@ -341,10 +328,10 @@ async def delete_workflow_message( """Delete a message from a workflow.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Verify workflow exists - workflow = service.base.getWorkflow(workflowId) + workflow = serviceChat.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -352,7 +339,7 @@ async def delete_workflow_message( ) # Delete the message - success = service.base.deleteWorkflowMessage(workflowId, messageId) + success = serviceChat.deleteWorkflowMessage(workflowId, messageId) if not success: raise HTTPException( @@ -364,7 +351,7 @@ async def delete_workflow_message( messageIds = workflow.get("messageIds", []) if messageId in messageIds: messageIds.remove(messageId) - service.base.updateWorkflow(workflowId, {"messageIds": messageIds}) + serviceChat.updateWorkflow(workflowId, {"messageIds": messageIds}) return { "workflowId": workflowId, @@ -392,10 +379,10 @@ async def delete_file_from_message( """Delete a file reference from a message in a workflow.""" try: # Get service container - service = createServiceContainer(currentUser) + serviceChat = getServiceChat(currentUser) # Verify workflow exists - workflow = service.base.getWorkflow(workflowId) + workflow = serviceChat.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -403,7 +390,7 @@ async def delete_file_from_message( ) # Delete file reference from message - success = service.base.deleteFileFromMessage(workflowId, messageId, fileId) + success = serviceChat.deleteFileFromMessage(workflowId, messageId, fileId) if not success: raise HTTPException( diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py index 1620d76e..90d07904 100644 --- a/modules/workflow/managerChat.py +++ b/modules/workflow/managerChat.py @@ -2,34 +2,55 @@ import logging import importlib import pkgutil import inspect -from typing import Dict, Any, Optional, List, Type, Callable, Awaitable +from typing import Dict, Any, Optional, List, Type, Callable, Awaitable, Union from datetime import datetime, UTC import json -import asyncio import base64 +import uuid +from modules.interfaces.serviceAppClass import User from modules.methods.methodBase import MethodBase, AuthSource, MethodResult from modules.workflow.serviceContainer import ServiceContainer from modules.interfaces.serviceChatModel import ( - AgentTask, AgentAction, AgentResult, Action, TaskStatus, ChatWorkflow, - ChatMessage, ChatDocument, ChatStat, ExtractedContent, ContentItem + TaskStatus, UserInputRequest, ContentMetadata, ContentItem, + ChatDocument, TaskDocument, ExtractedContent, TaskItem, + TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) from modules.workflow.processorDocument import DocumentProcessor -from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) class ChatManager: """Chat manager with improved AI integration and method handling""" - def __init__(self): - self.service = ServiceContainer() + def __init__(self, currentUser: User): self._discoverMethods() self.workflow: Optional[ChatWorkflow] = None - self.currentTask: Optional[AgentTask] = None + self.currentTask: Optional[TaskItem] = None self.workflowHistory: List[ChatMessage] = [] self.documentProcessor = DocumentProcessor() + self.userLanguage = None + self.currentUser = currentUser + # ===== Initialization and Setup ===== + async def initialize(self, workflow: ChatWorkflow) -> None: + """Initialize chat manager with workflow""" + self.service.workflow = workflow + + # Initialize AI model + self.service.model = { + 'callAiBasic': self._callAiBasic, + 'callAiAdvanced': self._callAiAdvanced + } + + # Initialize document processor + self.service.documentProcessor.initialize() + + def setUserLanguage(self, languageCode: str): + """Set the user's preferred language""" + self.userLanguage = languageCode + logger.debug(f"User language set to: {languageCode}") + def _discoverMethods(self): """Dynamically discover all method classes in modules.methods package""" try: @@ -59,375 +80,159 @@ class ChatManager: except Exception as e: logger.error(f"Error discovering methods: {str(e)}") - async def initialize(self, workflow: ChatWorkflow) -> None: - """Initialize chat manager with workflow""" - self.service.workflow = workflow - - # Initialize AI model - self.service.model = { - 'callAiBasic': self._callAiBasic, - 'callAiAdvanced': self._callAiAdvanced - } - - # Initialize document processor - self.service.documentProcessor.initialize() - - def _generatePrompt(self, task: str, document: ChatDocument, examples: List[Dict[str, str]] = None) -> str: - """Generate a prompt based on task and document""" + # ===== Task Creation and Management ===== + async def createInitialTask(self, workflow: ChatWorkflow, initialMessage: ChatMessage) -> Optional[TaskItem]: + """Create the initial task from the first message""" try: - # Create base prompt - prompt = f"""Task: {task} -Document: {document.filename} ({document.mimeType}) - -""" + # Get available methods and their actions + methodCatalog = self.service.getAvailableMethods() - # Add examples if provided - if examples: - prompt += "\nExamples:\n" - for example in examples: - prompt += f"Input: {example.get('input', '')}\n" - prompt += f"Output: {example.get('output', '')}\n\n" + # Process user input with AI + processedInput = await self._processUserInput(initialMessage.message, methodCatalog) - return prompt + # Create actions from processed input + actions = await self._createActions(processedInput['actions']) + + # Create task + task = TaskItem( + id=f"task_{datetime.now(UTC).timestamp()}", + workflowId=workflow.id, + userInput=processedInput['objective'], + dataList=initialMessage.documents, + actionList=actions, + status=TaskStatus.PENDING, + createdAt=datetime.now(UTC), + updatedAt=datetime.now(UTC) + ) + + # Add task to workflow + workflow.tasks.append(task) + return task except Exception as e: - logger.error(f"Error generating prompt: {str(e)}") - return "" - - async def createInitialTask(self, userInput: Dict[str, Any]) -> AgentTask: - """Create initial task from user input""" - # Get available methods and their actions - methodCatalog = self.service.getAvailableMethods() - - # Process user input with AI - processedInput = await self._processUserInput(userInput, methodCatalog) - - # Create actions from processed input - actions = await self._createActions(processedInput['actions']) - - # Create task - task = AgentTask( - id=f"task_{datetime.now(UTC).timestamp()}", - workflowId=self.workflow.id, - userInput=processedInput['objective'], - dataList=userInput.get('connections', []), - actionList=actions, - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) - - # Store in service - self.service.tasks['current'] = task - return task - - async def executeCurrentTask(self) -> None: - """Execute current task""" - task = self.service.tasks.get('current') - if not task: - raise ValueError("No current task to execute") - - await self.service.executeTask(task) - - async def defineNextTask(self) -> Optional[AgentTask]: - """Define next task based on current task results""" - current_task = self.service.tasks.get('current') - if not current_task: + logger.error(f"Error creating initial task: {str(e)}") return None - + + async def createNextTask(self, workflow: ChatWorkflow, previousResult: TaskResult) -> Optional[TaskItem]: + """Create next task based on previous result""" try: - # Analyze task results - analysis = await self._analyzeTaskResults(current_task) - - # If workflow is complete, update task status - if analysis['isComplete']: - current_task.status = TaskStatus.COMPLETED - current_task.updatedAt = datetime.now(UTC) + # Check if previous result was successful + if not previousResult.success: + logger.error(f"Previous task failed: {previousResult.error}") return None - - # If more actions needed, create next task - if not analysis['isComplete']: - next_task = self._createNextTask(current_task, analysis) - self.service.tasks['previous'] = current_task - self.service.tasks['current'] = next_task - return next_task - + + # Extract task data from previous result + taskData = previousResult.data + if not taskData: + logger.error("No task data in previous result") + return None + + # Create next task + nextTask = TaskItem( + id=f"task_{datetime.now(UTC).timestamp()}", + workflowId=workflow.id, + userInput=taskData.get('objective', ''), + actionList=await self._createActions(taskData.get('actions', [])), + status=TaskStatus.PENDING, + createdAt=datetime.now(UTC), + updatedAt=datetime.now(UTC) + ) + + # Add task to workflow + workflow.tasks.append(nextTask) + return nextTask + except Exception as e: - logger.error(f"Error defining next task: {e}") - current_task.status = TaskStatus.FAILED - current_task.updatedAt = datetime.now(UTC) + logger.error(f"Error creating next task: {str(e)}") return None - - async def _processUserInput(self, userInput: Dict[str, Any], methodCatalog: Dict[str, Any]) -> Dict[str, Any]: - """Process user input with AI to extract objectives and actions""" - # Create prompt with available methods and actions - prompt = f"""Given the following user input and available methods/actions, extract the objective and required actions: -User Input: {userInput.get('message', '')} - -Available Methods and Actions: -{json.dumps(methodCatalog, indent=2)} - -Please provide a JSON response with: -1. objective: The main goal or task to accomplish -2. actions: List of required actions with method and parameters - -Example format: -{{ - "objective": "Search for documents about project X", - "actions": [ - {{ - "method": "sharepoint", - "action": "search", - "parameters": {{ - "query": "project X", - "site": "projects" - }} - }} - ] -}} -""" - - # Call AI service - response = await self._callAiBasic(prompt) - return json.loads(response) - - async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[AgentAction]: - """Create action objects from processed input""" - actions = [] - for actionData in actionsData: - method = self.service.getMethod(actionData['method']) - if not method: - continue - - action = AgentAction( - id=f"action_{datetime.now(UTC).timestamp()}", - method=actionData['method'], - action=actionData['action'], - parameters=actionData.get('parameters', {}), - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) - actions.append(action) - - return actions - - async def _summarizeWorkflow(self) -> str: - """Summarize workflow history""" - if not self.workflow.messages: - return "" - - prompt = f"""Summarize the following chat history: - {json.dumps([m.dict() for m in self.workflow.messages], indent=2)} - - Please provide a concise summary focusing on: - 1. Main objectives - 2. Key actions taken - 3. Current status - 4. Any issues or blockers - """ - - return await self._callAiBasic(prompt) - - async def _analyzeTaskResults(self, task: AgentTask) -> Dict[str, Any]: - """Analyze task results to determine next steps""" - # Get workflow summary - summary = await self._summarizeWorkflow() - - # Create prompt for analysis - prompt = f"""Analyze the following task results and workflow history to determine next steps: - -Task Results: -{json.dumps([a.dict() for a in task.actionList], indent=2)} - -Workflow Summary: -{summary} - -Please provide a JSON response with: -1. isComplete: Whether the workflow is complete -2. nextActions: List of next actions needed (if any) -3. issues: Any issues or blockers identified - -Example format: -{{ - "isComplete": false, - "nextActions": [ - {{ - "method": "sharepoint", - "action": "read", - "parameters": {{ - "documentId": "doc123" - }} - }} - ], - "issues": ["Need authentication for SharePoint"] -}} -""" - - response = await self._callAiBasic(prompt) - return json.loads(response) - - def _createNextTask(self, current_task: AgentTask, analysis: Dict[str, Any]) -> AgentTask: - """Create next task based on analysis""" - # Create actions for next task - actions = [] - for action_data in analysis.get('nextActions', []): - action = AgentAction( - id=f"action_{datetime.now(UTC).timestamp()}", - method=action_data['method'], - action=action_data['action'], - parameters=action_data.get('parameters', {}), - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) - actions.append(action) - - # Create and return next task - return AgentTask( - id=f"task_{datetime.now(UTC).timestamp()}", - workflowId=self.workflow.id, - userInput=current_task.userInput, - dataList=current_task.dataList, - actionList=actions, - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) - - async def processTask(self, task: AgentTask) -> Dict[str, Any]: - """Process a task with improved error handling and AI integration""" + async def identifyNextTask(self, workflow: ChatWorkflow) -> TaskResult: + """Identify the next task based on workflow state""" try: - # Execute task - await self.service.executeTask(task) + # Get workflow summary + summary = await self._summarizeWorkflow() - # Process results - if task.status == TaskStatus.COMPLETED: - # Generate feedback using AI - feedback = await self._processTaskResults(task) - task.thisTaskFeedback = feedback - - # Create output documents - documents = await self._createOutputDocuments(task) - task.documentsOutput = documents - - return { - "status": "success", - "feedback": feedback, - "documents": documents - } - else: - return { - "status": task.status, - "error": task.error, - "feedback": f"Task failed: {task.error}" - } - - except Exception as e: - logger.error(f"Error processing task: {str(e)}") - return { - "status": "error", - "error": str(e), - "feedback": f"Error processing task: {str(e)}" - } - - def _generateDocumentPrompt(self, task: str) -> str: - """Generate a prompt for document generation""" - return f"""Generate output documents for the following task: - -Task: {task} - -For each document you need to generate, provide a TaskDocument object with the following structure: -{{ - "filename": "string", # Filename with extension - "mimeType": "string", # MIME type of the file - "data": "string", # File content as text or base64 - "base64Encoded": boolean # True if data is base64 encoded -}} - -Rules: -1. For text files (txt, json, xml, etc.), provide content directly in the data field -2. For binary files (images, videos, etc.), encode content in base64 and set base64Encoded to true -3. Use appropriate MIME types (e.g., text/plain, image/jpeg, application/pdf) -4. Include file extensions in filenames - -Return a JSON array of TaskDocument objects. -""" - - async def _processTaskResults(self, task: AgentTask) -> str: - """Process task results and generate feedback""" - try: - # Generate document prompt - docPrompt = self._generateDocumentPrompt(task.userInput) + # Generate prompt for next task + prompt = f"""Based on the workflow summary: + {summary} - # Get AI response for document generation - docResponse = await self._callAiBasic(docPrompt) + Determine what the next task should be. + Return a JSON object with: + - objective: The main goal or task to accomplish + - actions: List of required actions with method and parameters + """ - # Parse response into TaskDocument objects + # Get AI response + response = await self._callAiBasic(prompt) + + # Parse response try: - taskDocs = json.loads(docResponse) - task.documentsOutput = taskDocs - except json.JSONDecodeError as e: - logger.error(f"Error parsing document response: {str(e)}") - return f"Error processing results: {str(e)}" - - # Generate feedback - feedback = await self._callAiBasic( - f"""Generate feedback for the completed task: - Task: {task.userInput} - Generated Documents: {len(task.documentsOutput)} files - - Provide a concise summary of what was accomplished. - """ - ) - - return feedback - - except Exception as e: - logger.error(f"Error processing task results: {str(e)}") - return f"Error processing results: {str(e)}" - - async def _createOutputDocuments(self, task: AgentTask) -> List[ChatDocument]: - """Create output documents from task results""" - try: - fileIds = [] - - # Process each TaskDocument from AI output - for taskDoc in task.documentsOutput: - # Store file in database - fileItem = self.service.functions.createFile( - name=taskDoc.filename, - mimeType=taskDoc.mimeType + result = json.loads(response) + return TaskResult( + taskId=f"analysis_{datetime.now(UTC).timestamp()}", + status=TaskStatus.COMPLETED, + success=True, + timestamp=datetime.now(UTC), + data=result + ) + except json.JSONDecodeError as e: + logger.error(f"Error parsing AI response: {str(e)}") + return TaskResult( + taskId=f"analysis_{datetime.now(UTC).timestamp()}", + status=TaskStatus.FAILED, + success=False, + timestamp=datetime.now(UTC), + error=f"Error parsing AI response: {str(e)}" ) - - # Store file content - if taskDoc.base64Encoded: - # Decode base64 content - content = base64.b64decode(taskDoc.data) - else: - # Use text content directly - content = taskDoc.data.encode('utf-8') - - # Store file data - self.service.functions.createFileData(fileItem.id, content) - fileIds.append(fileItem.id) - - # Convert all files to ChatDocuments in one call - if fileIds: - return await self.service.chat.processFileIds(fileIds) - return [] except Exception as e: - logger.error(f"Error creating output documents: {str(e)}") - return [] - + logger.error(f"Error identifying next task: {str(e)}") + return TaskResult( + taskId=f"analysis_{datetime.now(UTC).timestamp()}", + status=TaskStatus.FAILED, + success=False, + timestamp=datetime.now(UTC), + error=f"Error identifying next task: {str(e)}" + ) + + async def callAi(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str: + """Enhanced AI service call with language support.""" + if not self.service or not self.service.base: + logger.error("AI service not set in ChatManager") + return "Error: AI service not available" + + # Add language instruction for user-facing responses + if produceUserAnswer and self.userLanguage: + ltext = f"Please respond in '{self.userLanguage}' language." + if messages and messages[0]["role"] == "system": + if "language" not in messages[0]["content"].lower(): + messages[0]["content"] = f"{ltext} {messages[0]['content']}" + else: + # Insert a system message with language instruction + messages.insert(0, { + "role": "system", + "content": ltext + }) + + # Call the AI service + if temperature is not None: + return await self.service.base.callAi(messages, temperature=temperature) + else: + return await self.service.base.callAi(messages) + + async def callAi4Image(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: + """Enhanced AI service call with language support.""" + if not self.service or not self.service.base: + logger.error("AI service not set in ChatManager") + return "Error: AI service not available" + return await self.service.base.analyzeImage(imageData, mimeType, prompt) + async def _callAiBasic(self, prompt: str) -> str: """Call basic AI service""" try: if not self.service or not self.service.base: raise ValueError("Service or base interface not initialized") - return await self.service.base.callAi([ + return await self.callAi([ {"role": "system", "content": "You are an AI assistant that helps process user requests."}, {"role": "user", "content": prompt} ]) @@ -461,6 +266,7 @@ Return a JSON array of TaskDocument objects. # Get user language from workflow mandate userLanguage = workflow.mandateId.split('_')[0] if workflow.mandateId else 'en' + self.setUserLanguage(userLanguage) # Prepare messages for AI context messages = [ @@ -490,10 +296,431 @@ Return a JSON array of TaskDocument objects. }) # Generate feedback using AI - feedback = await self.service.aiService.callApi(messages, temperature=0.7) + feedback = await self.callAi(messages, produceUserAnswer=True, temperature=0.7) return feedback except Exception as e: logger.error(f"Error generating workflow feedback: {str(e)}") - return "Workflow completed successfully." \ No newline at end of file + return "Workflow completed successfully." + + def _generatePrompt(self, task: str, document: ChatDocument, examples: List[Dict[str, str]] = None) -> str: + """Generate a prompt based on task and document""" + try: + # Create base prompt + prompt = f"""Task: {task} +Document: {document.filename} ({document.mimeType}) + +""" + + # Add examples if provided + if examples: + prompt += "\nExamples:\n" + for example in examples: + prompt += f"Input: {example.get('input', '')}\n" + prompt += f"Output: {example.get('output', '')}\n\n" + + return prompt + + except Exception as e: + logger.error(f"Error generating prompt: {str(e)}") + return "" + + # ===== Task Execution and Processing ===== + async def executeTask(self, task: TaskItem) -> TaskResult: + """Execute a task and return its result""" + try: + # Create result object + result = TaskResult( + taskId=task.id, + status=task.status, + success=True, + timestamp=datetime.now(UTC) + ) + + # Start timing + startTime = datetime.now(UTC) + + # Execute each action + for action in task.actionList: + try: + # Execute action + actionResult = await action.execute() + + # Update action status + action.status = actionResult.status + if actionResult.error: + action.error = actionResult.error + + except Exception as e: + logger.error(f"Action execution error: {str(e)}") + action.status = TaskStatus.FAILED + action.error = str(e) + + # Calculate processing time + endTime = datetime.now(UTC) + result.processingTime = (endTime - startTime).total_seconds() + + # Update task status + if all(action.status == TaskStatus.COMPLETED for action in task.actionList): + result.status = TaskStatus.COMPLETED + result.success = True + else: + result.status = TaskStatus.FAILED + result.success = False + result.error = "One or more actions failed" + + # Generate feedback and documents if task completed successfully + if result.status == TaskStatus.COMPLETED: + # Generate feedback using AI + result.feedback = await self._processTaskResults(task) + + # Create output documents + result.documents = await self._createOutputDocuments(task) + else: + result.feedback = f"Task failed: {result.error}" + + return result + + except Exception as e: + logger.error(f"Task execution error: {str(e)}") + raise + + async def parseTaskResult(self, workflow: ChatWorkflow, result: TaskResult) -> None: + """Process and store task result in workflow""" + try: + # Find task in workflow + task = next((t for t in workflow.tasks if t.id == result.taskId), None) + if not task: + logger.error(f"Task {result.taskId} not found in workflow") + return + + # Update task status + task.status = result.status + if result.error: + task.error = result.error + + # Create feedback message if available + if result.feedback: + message = ChatMessage( + id=str(uuid.uuid4()), + workflowId=workflow.id, + role="assistant", + message=result.feedback, + status="step", + documents=result.documents + ) + workflow.messages.append(message) + + # Update workflow stats + if result.processingTime: + if not workflow.stats: + workflow.stats = ChatStat() + workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime + + except Exception as e: + logger.error(f"Error parsing task result: {str(e)}") + raise + + async def shouldContinue(self, workflow: ChatWorkflow) -> bool: + """Determine if workflow should continue""" + try: + # Check if workflow is in a terminal state + if workflow.status in ["completed", "failed", "stopped"]: + return False + + # Check if there are any pending tasks + hasPendingTasks = any(t.status == TaskStatus.PENDING for t in workflow.tasks) + if not hasPendingTasks: + return False + + # Check if any task is currently running + hasRunningTasks = any(t.status == TaskStatus.RUNNING for t in workflow.tasks) + if hasRunningTasks: + return True + + return False + + except Exception as e: + logger.error(f"Error checking workflow continuation: {str(e)}") + return False + + async def _summarizeWorkflow(self) -> str: + """Summarize workflow history""" + if not self.workflow.messages: + return "" + + prompt = f"""Summarize the following chat history: + {json.dumps([m.dict() for m in self.workflow.messages], indent=2)} + + Please provide a concise summary focusing on: + 1. Main objectives + 2. Key actions taken + 3. Current status + 4. Any issues or blockers + """ + + return await self._callAiBasic(prompt) + + async def _analyzeTaskResults(self, task: TaskItem) -> Dict[str, Any]: + """Analyze task results to determine next steps""" + # Get workflow summary + summary = await self._summarizeWorkflow() + + # Generate prompt for analysis + prompt = f"""Based on the workflow summary and task results: + {summary} + + Task: {task.userInput} + Status: {task.status} + Error: {task.error if task.error else 'None'} + + Determine if the workflow is complete and what the next steps should be. + Return a JSON object with: + - isComplete: boolean + - objective: string + - nextActions: array of action objects + """ + + # Get AI response + response = await self._callAiBasic(prompt) + + # Parse response + return json.loads(response) + + def _promptInstructions(self, methodCatalog: Dict[str, Any], isInitialTask: bool = False) -> str: + """Generate common prompt instructions for task analysis""" + instructions = f"""Available Methods and Actions: +{json.dumps(methodCatalog, indent=2)} + +""" + if isInitialTask: + instructions += """Please provide a JSON response with: +1. objective: The main goal or task to accomplish +2. actions: List of required actions with method and parameters + +Example format: +{ + "objective": "Search for documents about project X", + "actions": [ + { + "method": "sharepoint", + "action": "search", + "parameters": { + "query": "project X", + "site": "projects" + } + } + ] +}""" + else: + instructions += """Please provide a JSON response with: +1. isComplete: Whether the workflow is complete +2. nextActions: List of next actions needed (if any) +3. issues: Any issues or blockers identified + +Example format: +{ + "isComplete": false, + "nextActions": [ + { + "method": "sharepoint", + "action": "read", + "parameters": { + "documentId": "doc123" + } + } + ], + "issues": ["Need authentication for SharePoint"] +} + +Note: Only use methods and actions that are available in the method catalog above.""" + + return instructions + + async def _processUserInput(self, userInput: Dict[str, Any], methodCatalog: Dict[str, Any]) -> Dict[str, Any]: + """Process user input with AI to extract objectives and actions""" + # Create prompt with available methods and actions + prompt = f"""Given the following user input and available methods/actions, extract the objective and required actions: + +User Input: {userInput.get('message', '')} + +{self._promptInstructions(methodCatalog, isInitialTask=True)}""" + + # Call AI service + response = await self._callAiBasic(prompt) + return json.loads(response) + + async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[TaskItem]: + """Create action objects from processed input""" + actions = [] + for actionData in actionsData: + try: + # Validate required fields + if not all(k in actionData for k in ['method', 'action']): + logger.warning(f"Skipping invalid action data: {actionData}") + continue + + action = TaskItem( + id=f"action_{datetime.now(UTC).timestamp()}", + method=actionData['method'], + action=actionData['action'], + parameters=actionData.get('parameters', {}), + status=TaskStatus.PENDING, + retryCount=0, + retryMax=actionData.get('retryMax', 3) + ) + actions.append(action) + + except Exception as e: + logger.error(f"Error creating action: {str(e)}") + continue + + return actions + + async def _processTaskResults(self, task: TaskItem) -> str: + """Process task results and generate feedback""" + try: + # Generate document prompt + docPrompt = self._generateDocumentPrompt(task.userInput) + + # Get AI response for document generation + docResponse = await self._callAiBasic(docPrompt) + + # Parse response into TaskDocument objects + try: + taskDocs = json.loads(docResponse) + task.documentsOutput = taskDocs + except json.JSONDecodeError as e: + logger.error(f"Error parsing document response: {str(e)}") + return f"Error processing results: {str(e)}" + + # Generate feedback + feedback = await self._callAiBasic( + f"""Generate feedback for the completed task: + Task: {task.userInput} + Generated Documents: {len(task.documentsOutput)} files + + Provide a concise summary of what was accomplished. + """ + ) + + return feedback + + except Exception as e: + logger.error(f"Error processing task results: {str(e)}") + return f"Error processing results: {str(e)}" + + async def _createOutputDocuments(self, task: TaskItem) -> List[ChatDocument]: + """Create output documents from task results""" + try: + fileIds = [] + + # Process each TaskDocument from AI output + for taskDoc in task.documentsOutput: + # Store file in database + fileItem = self.service.functions.createFile( + name=taskDoc.filename, + mimeType=taskDoc.mimeType + ) + + # Store file content + if taskDoc.base64Encoded: + # Decode base64 content + content = base64.b64decode(taskDoc.data) + else: + # Use text content directly + content = taskDoc.data.encode('utf-8') + + # Store file data + self.service.functions.createFileData(fileItem.id, content) + fileIds.append(fileItem.id) + + # Convert all files to ChatDocuments in one call + if fileIds: + return await self.processFileIds(fileIds) + return [] + + except Exception as e: + logger.error(f"Error creating output documents: {str(e)}") + return [] + + async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: + """Process multiple files and extract their contents.""" + documents = [] + for fileId in fileIds: + # Get file metadata + fileMetadata = self.service.functions.getFile(fileId) + if not fileMetadata: + logger.warning(f"File metadata not found for {fileId}") + continue + + # Create ChatDocument + document = ChatDocument( + id=str(uuid.uuid4()), + fileId=fileId, + filename=fileMetadata.get("name", "Unknown"), + fileSize=fileMetadata.get("size", 0), + mimeType=fileMetadata.get("mimeType", "text/plain") + ) + + documents.append(document) + return documents + + async def addTaskResult(self, workflow: ChatWorkflow, result: TaskResult) -> None: + """Add task result to workflow and update status""" + try: + # Find task in workflow + task = next((t for t in workflow.tasks if t.id == result.taskId), None) + if not task: + logger.error(f"Task {result.taskId} not found in workflow") + return + + # Update task status + task.status = result.status + if result.error: + task.error = result.error + + # Create feedback message if available + if result.feedback: + message = ChatMessage( + id=str(uuid.uuid4()), + workflowId=workflow.id, + role="assistant", + message=result.feedback, + status="step", + documents=result.documents + ) + workflow.messages.append(message) + + # Update workflow stats + if result.processingTime: + if not workflow.stats: + workflow.stats = ChatStat() + workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime + + except Exception as e: + logger.error(f"Error adding task result: {str(e)}") + + def _generateDocumentPrompt(self, task: str) -> str: + """Generate a prompt for document generation""" + return f"""Generate output documents for the following task: + +Task: {task} + +For each document you need to generate, provide a TaskDocument object with the following structure: +{{ + "filename": "string", # Filename with extension + "mimeType": "string", # MIME type of the file + "data": "string", # File content as text or base64 + "base64Encoded": boolean # True if data is base64 encoded +}} + +Rules: +1. For text files (txt, json, xml, etc.), provide content directly in the data field +2. For binary files (images, videos, etc.), encode content in base64 and set base64Encoded to true +3. Use appropriate MIME types (e.g., text/plain, image/jpeg, application/pdf) +4. Include file extensions in filenames + +Return a JSON array of TaskDocument objects. +""" \ No newline at end of file diff --git a/modules/workflow/managerDocument.py b/modules/workflow/managerDocument.py index 79e215c9..30cb8194 100644 --- a/modules/workflow/managerDocument.py +++ b/modules/workflow/managerDocument.py @@ -4,8 +4,6 @@ Document Manager Module for handling document operations and content extraction. import base64 import logging -from typing import List, Optional, Dict, Any, Union -from pathlib import Path import uuid from modules.interfaces.serviceChatModel import ( diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py index f035adef..b0dcedf8 100644 --- a/modules/workflow/managerWorkflow.py +++ b/modules/workflow/managerWorkflow.py @@ -3,9 +3,12 @@ import logging from datetime import datetime, UTC import uuid +from modules.interfaces.serviceAppClass import User + from modules.interfaces.serviceChatModel import ( - AgentTask, AgentResult, TaskStatus, ChatMessage, - UserInputRequest, ChatWorkflow, ChatDocument + TaskStatus, UserInputRequest, ContentMetadata, ContentItem, + ChatDocument, TaskDocument, ExtractedContent, TaskItem, + TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) from modules.interfaces.serviceChatClass import ChatInterface from modules.workflow.managerChat import ChatManager @@ -17,131 +20,111 @@ class WorkflowStoppedException(Exception): pass class WorkflowManager: - """Manages workflow execution lifecycle""" + """Manager for workflow processing and coordination""" - def __init__(self, chatInterface: ChatInterface): - self.workflow = None - self.isRunning = False + def __init__(self, chatInterface: ChatInterface, currentUser: User): self.chatInterface = chatInterface - self.chatManager = ChatManager() - + self.chatManager = ChatManager(currentUser) + self.currentUser = currentUser + def _checkWorkflowStopped(self, workflow: ChatWorkflow) -> None: + """Check if workflow has been stopped""" if workflow.status == "stopped": - logger.info(f"Workflow {workflow.id} stopped by user") - raise WorkflowStoppedException("User stopped workflow") - + raise WorkflowStoppedException("Workflow was stopped by user") + async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> None: - """Main workflow execution process""" + """Process a workflow with user input""" try: - self.workflow = workflow - self.isRunning = True + # Initialize chat manager + await self.chatManager.initialize(workflow) - # Process documents from userInput using ChatInterface's method - documents = [] - if userInput.listFileId: - documents = await self.chatInterface.processFileIds(userInput.listFileId) + # Set user language + self.chatManager.setUserLanguage(userInput.userLanguage) - # Create initial ChatMessage from userInput - initialMessage = ChatMessage( + # Send first message + message = await self._sendFirstMessage(userInput, workflow) + + # Create initial task + task = await self.chatManager.createInitialTask(workflow, message) + + # Process workflow + while True: + # Check if workflow is stopped + self._checkWorkflowStopped(workflow) + + # Execute task + result = await self.chatManager.executeTask(task) + + # Process result + await self.chatManager.parseTaskResult(workflow, result) + + # Check if workflow should continue + if not await self.chatManager.shouldContinue(workflow): + break + + # Identify next task + nextTaskResult = await self.chatManager.identifyNextTask(workflow) + + # Create next task + task = await self.chatManager.createNextTask(workflow, nextTaskResult) + if not task: + break + + # Send last message + await self._sendLastMessage(workflow) + + except WorkflowStoppedException: + logger.info("Workflow stopped by user") + except Exception as e: + logger.error(f"Workflow processing error: {str(e)}") + raise + + async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage: + """Send first message to start workflow""" + try: + # Create initial message + message = ChatMessage( id=str(uuid.uuid4()), workflowId=workflow.id, role="user", message=userInput.prompt, - status="first", # First message in workflow - documents=documents + status="first", + sequenceNr=1, + publishedAt=datetime.now(UTC).isoformat() ) + # Add documents if any + if userInput.listFileId: + message.documents = await self.chatManager.processFileIds(userInput.listFileId) + # Add message to workflow - await self.chatInterface.createWorkflowMessage(initialMessage.dict()) + workflow.messages.append(message) + return message - # Create initial task - task = await self.chatInterface.createInitialTask(workflow, initialMessage) - if not task: - logger.error("Failed to create initial task") - workflow.status = "error" - workflow.error = "Failed to create initial task" - return + except Exception as e: + logger.error(f"Error sending first message: {str(e)}") + raise - # Main workflow loop - while self.isRunning and workflow.status == "running": - - self._checkWorkflowStopped(workflow) - - # Execute task - result = AgentResult( - id=task.id, - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) - - # Execute each action - for action in task.actionList: - - self._checkWorkflowStopped(workflow) - - try: - # Execute action - actionResult = await action.execute() - - # Update action status - action.status = TaskStatus.COMPLETED if actionResult.success else TaskStatus.FAILED - action.result = actionResult - - # Check for failure - if not actionResult.success: - result.status = TaskStatus.FAILED - result.error = actionResult.error - break - - except Exception as e: - logger.error(f"Action error: {str(e)}") - action.status = TaskStatus.FAILED - result.status = TaskStatus.FAILED - result.error = str(e) - break - - # Update result status - if result.status != TaskStatus.FAILED: - result.status = TaskStatus.COMPLETED - - result.updatedAt = datetime.now(UTC) - - self._checkWorkflowStopped(workflow) - - # Update workflow with result - await self.chatInterface.addTaskResult(workflow, result) - - # Get next task - task = await self.chatInterface.getNextTask(workflow) - if not task: - break - - # Check if should continue - if not await self.chatInterface.shouldContinue(workflow): - break + async def _sendLastMessage(self, workflow: ChatWorkflow) -> None: + """Send last message to complete workflow""" + try: + # Generate feedback + feedback = await self.chatManager.generateWorkflowFeedback(workflow) - # Generate final feedback message using ChatManager - finalFeedback = await self.chatManager.generateWorkflowFeedback(workflow) - - # Create final message with "last" status - self._checkWorkflowStopped(workflow) - finalMessage = ChatMessage( + # Create last message + message = ChatMessage( id=str(uuid.uuid4()), workflowId=workflow.id, role="assistant", - message=finalFeedback, - status="last" # Last message in workflow + message=feedback, + status="last", + sequenceNr=len(workflow.messages) + 1, + publishedAt=datetime.now(UTC).isoformat() ) - await self.chatInterface.createWorkflowMessage(finalMessage.dict()) - # Complete workflow - if workflow.status != "failed": - workflow.status = "completed" - workflow.lastActivity = datetime.now(UTC).isoformat() + # Add message to workflow + workflow.messages.append(message) except Exception as e: - logger.error(f"Workflow error: {str(e)}") - if self.workflow: - self.workflow.status = "error" - self.workflow.error = str(e) \ No newline at end of file + logger.error(f"Error sending last message: {str(e)}") + raise \ No newline at end of file diff --git a/modules/workflow/serviceContainer.py b/modules/workflow/serviceContainer.py index 7ac368ee..5b5f0a2f 100644 --- a/modules/workflow/serviceContainer.py +++ b/modules/workflow/serviceContainer.py @@ -6,15 +6,20 @@ import asyncio from modules.shared.configuration import APP_CONFIG from modules.methods import MethodBase, MethodResult -from modules.interfaces.serviceChatModel import AgentTask, AgentAction, AgentResult, Action, TaskStatus, ActionStatus +from modules.interfaces.serviceChatModel import ( + TaskStatus, UserInputRequest, ContentMetadata, ContentItem, + ChatDocument, TaskDocument, ExtractedContent, TaskItem, + TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow +) from modules.interfaces.serviceManagementClass import ServiceManagement +from modules.interfaces.serviceChatClass import ChatInterface logger = logging.getLogger(__name__) class ServiceContainer: """Service container for dependency injection and service management.""" - def __init__(self): + def __init__(self, chatInterface: ChatInterface): self.methods = {} self.context = {} self.workflow = None @@ -29,7 +34,7 @@ class ServiceContainer: 'lastError': None, 'lastErrorTime': None } - self.tasks: Dict[str, Any] = {} # Will be populated with AgentTask instances + self.tasks: Dict[str, TaskItem] = {} # Will be populated with TaskItem instances # Initialize service management self.serviceManagement = ServiceManagement() diff --git a/notes/changelog.txt b/notes/changelog.txt index 4a8acd72..057f9b92 100644 --- a/notes/changelog.txt +++ b/notes/changelog.txt @@ -1,8 +1,5 @@ -Clean - -- ServiceContainer to clean for used functions, no spare! -- all the definitions used in serviceChatModel? -- all AI calls to route over AI-Module (AI basic, ai special, ai...) +- all AI calls to route over AI-Module (AI basic, ai special, ai...) and to streamline +- service object to define correctly