diff --git a/modules/connectors/connectorAiAnthropic.py b/modules/connectors/connectorAiAnthropic.py index b1e435ee..9f452790 100644 --- a/modules/connectors/connectorAiAnthropic.py +++ b/modules/connectors/connectorAiAnthropic.py @@ -137,7 +137,11 @@ class ChatService: textParts.append(part) if textParts: - textParts[0]["text"] = systemContent + textParts[0].get("text", "") + # Create a new text part with combined content + textParts[0] = { + "type": "text", + "text": systemContent + textParts[0].get("text", "") + } # Anthropic only supports "user" and "assistant" roles if role not in ["user", "assistant"]: diff --git a/modules/connectors/connectorAiOpenai.py b/modules/connectors/connectorAiOpenai.py index ad79f445..1014137d 100644 --- a/modules/connectors/connectorAiOpenai.py +++ b/modules/connectors/connectorAiOpenai.py @@ -18,7 +18,7 @@ def loadConfigData(): "maxTokens": int(APP_CONFIG.get('Connector_AiOpenai_MAX_TOKENS')) } -class ChatService: +class AiConnector: """Connector for communication with the OpenAI API.""" def __init__(self): @@ -38,7 +38,7 @@ class ChatService: ) logger.info(f"OpenAI Connector initialized with model: {self.modelName}") - async def callApi(self, messages: List[Dict[str, Any]], temperature: float = None, maxTokens: int = None) -> str: + async def callAiBasic(self, messages: List[Dict[str, Any]], temperature: float = None, maxTokens: int = None) -> str: """ Calls the OpenAI API with the given messages. @@ -85,11 +85,7 @@ class ChatService: logger.error(f"Error calling OpenAI API: {str(e)}") raise HTTPException(status_code=500, detail=f"Error calling OpenAI API: {str(e)}") - async def close(self): - """Closes the HTTP client when the application exits""" - await self.httpClient.aclose() - - async def analyzeImage(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: + async def callAiImage(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: """ Analyzes an image with the OpenAI Vision API. diff --git a/modules/connectors/connectorDbJson.py b/modules/connectors/connectorDbJson.py index fd38ffcc..0d6658ae 100644 --- a/modules/connectors/connectorDbJson.py +++ b/modules/connectors/connectorDbJson.py @@ -1,6 +1,6 @@ import json import os -from typing import List, Dict, Any, Optional, Union +from typing import List, Dict, Any, Optional, Union, TypedDict import logging from datetime import datetime import uuid @@ -10,6 +10,10 @@ from modules.shared.attributeUtils import to_dict logger = logging.getLogger(__name__) +class TableCache(TypedDict): + """Type definition for table cache entries""" + recordIds: List[str] + class DatabaseConnector: """ A connector for JSON-based data storage. @@ -31,8 +35,8 @@ class DatabaseConnector: os.makedirs(self.dbFolder, exist_ok=True) # Cache for loaded data - self._tablesCache = {} - self._tableMetadataCache = {} # Cache for table metadata (record IDs, etc.) + self._tablesCache: Dict[str, List[Dict[str, Any]]] = {} + self._tableMetadataCache: Dict[str, TableCache] = {} # Cache for table metadata (record IDs, etc.) # Initialize system table self._systemTableName = "_system" @@ -91,7 +95,7 @@ class DatabaseConnector: """Returns the full path to a table folder""" return os.path.join(self.dbFolder, table) - def _getRecordPath(self, table: str, recordId: Union[str, int]) -> str: + def _getRecordPath(self, table: str, recordId: str) -> str: """Returns the full path to a record file""" return os.path.join(self._getTablePath(table), f"{recordId}.json") @@ -134,7 +138,7 @@ class DatabaseConnector: return metadata - def _loadRecord(self, table: str, recordId: Union[str, int]) -> Optional[Dict[str, Any]]: + def _loadRecord(self, table: str, recordId: str) -> Optional[Dict[str, Any]]: """Loads a single record from the table.""" recordPath = self._getRecordPath(table, recordId) try: @@ -521,4 +525,4 @@ class DatabaseConnector: def getAllInitialIds(self) -> Dict[str, str]: """Returns all registered initial IDs.""" systemData = self._loadSystemTable() - return systemData.copy() # Return a copy to protect the original + return systemData.copy() # Return a copy to protect the original \ No newline at end of file diff --git a/modules/interfaces/interfaceAi.py b/modules/interfaces/interfaceAi.py new file mode 100644 index 00000000..5c19ffb0 --- /dev/null +++ b/modules/interfaces/interfaceAi.py @@ -0,0 +1,38 @@ +import logging +from typing import Dict, Any, List, Union +from modules.connectors.connectorAiOpenai import AiConnector + +logger = logging.getLogger(__name__) + +class AiInterface: + """Interface for AI service interactions""" + + def __init__(self): + self.service = AiConnector() + + async def callAiBasic(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str: + """Enhanced AI service call with language support.""" + + # Add language instruction for user-facing responses + if produceUserAnswer and hasattr(self, 'userLanguage') and self.userLanguage: + ltext = f"Please respond in '{self.userLanguage}' language." + if messages and messages[0]["role"] == "system": + if "language" not in messages[0]["content"].lower(): + messages[0]["content"] = f"{ltext} {messages[0]['content']}" + else: + # Insert a system message with language instruction + messages.insert(0, { + "role": "system", + "content": ltext + }) + + # Call the AI service + return await self.service.callAiBasic(messages, temperature=temperature) + + async def callAiImage(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: + """Enhanced AI service call with language support.""" + if not self.service: + logger.error("AI service not set in AiInterface") + return "Error: AI service not available" + return await self.service.callAiImage(imageData, mimeType, prompt) + diff --git a/modules/interfaces/serviceAppAccess.py b/modules/interfaces/serviceAppAccess.py index 9ecac84c..a7b301f8 100644 --- a/modules/interfaces/serviceAppAccess.py +++ b/modules/interfaces/serviceAppAccess.py @@ -66,7 +66,8 @@ class AppAccess: filtered_records = recordset elif self.privilege == UserPrivilege.ADMIN: # Admin sees connections for users in their mandate - user_ids = [u["id"] for u in self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})] + users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId}) + user_ids: List[str] = [str(u["id"]) for u in users] filtered_records = [r for r in recordset if r.get("userId") in user_ids] else: # Regular users only see their own connections @@ -114,7 +115,8 @@ class AppAccess: record["_hideDelete"] = False # Admin can edit/delete connections for users in their mandate elif self.privilege == UserPrivilege.ADMIN: - user_ids = [u["id"] for u in self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})] + users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId}) + user_ids: List[str] = [str(u["id"]) for u in users] record["_hideEdit"] = record.get("userId") not in user_ids record["_hideDelete"] = record.get("userId") not in user_ids # Regular users can only edit/delete their own connections @@ -167,7 +169,7 @@ class AppAccess: # Check specific record permissions if recordId is not None: # Get the record to check ownership - records = self.db.getRecordset(table, recordFilter={"id": recordId}) + records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": str(recordId)}) if not records: return False @@ -177,7 +179,8 @@ class AppAccess: if table == "connections": # Admin can modify connections for users in their mandate if self.privilege == UserPrivilege.ADMIN: - user_ids = [u["id"] for u in self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId})] + users: List[Dict[str, Any]] = self.db.getRecordset("users", recordFilter={"mandateId": self.mandateId}) + user_ids: List[str] = [str(u["id"]) for u in users] return record.get("userId") in user_ids # Users can only modify their own connections return record.get("userId") == self.userId @@ -213,7 +216,7 @@ class AppAccess: """ try: # Get session - sessions = self.db.getRecordset("sessions", recordFilter={"id": sessionId}) + sessions: List[Dict[str, Any]] = self.db.getRecordset("sessions", recordFilter={"id": sessionId}) if not sessions: return False diff --git a/modules/interfaces/serviceChatAccess.py b/modules/interfaces/serviceChatAccess.py index 12e37814..cd9428ff 100644 --- a/modules/interfaces/serviceChatAccess.py +++ b/modules/interfaces/serviceChatAccess.py @@ -94,7 +94,7 @@ class ChatAccess: # For regular users and admins, check specific cases if recordId is not None: # Get the record to check ownership - records = self.db.getRecordset(table, recordFilter={"id": recordId}) + records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": recordId}) if not records: return False diff --git a/modules/interfaces/serviceChatClass.py b/modules/interfaces/serviceChatClass.py index 0b80150e..eb730e58 100644 --- a/modules/interfaces/serviceChatClass.py +++ b/modules/interfaces/serviceChatClass.py @@ -14,9 +14,7 @@ import asyncio from modules.interfaces.serviceChatAccess import ChatAccess from modules.interfaces.serviceChatModel import ( - TaskStatus, UserInputRequest, ContentMetadata, ContentItem, - ChatDocument, TaskDocument, ExtractedContent, TaskItem, - TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow + TaskStatus, UserInputRequest, ChatDocument, TaskItem, ChatStat, ChatLog, ChatMessage, ChatWorkflow, TaskAction ) from modules.interfaces.serviceAppModel import User @@ -846,6 +844,186 @@ class ChatInterface: logger.error(f"Error stopping workflow: {str(e)}") raise + # Task Management + + def getTask(self, taskId: str) -> Optional[TaskItem]: + """Returns a task by ID if user has access.""" + tasks = self.db.getRecordset("tasks", recordFilter={"id": taskId}) + if not tasks: + return None + + filteredTasks = self._uam("tasks", tasks) + if not filteredTasks: + return None + + task = filteredTasks[0] + try: + # Validate task data against TaskItem model + return TaskItem( + id=task["id"], + workflowId=task["workflowId"], + status=task.get("status", TaskStatus.PENDING), + error=task.get("error"), + startedAt=task.get("startedAt"), + finishedAt=task.get("finishedAt"), + actionList=[TaskAction(**action) for action in task.get("actionList", [])], + documentsOutput=task.get("documentsOutput", []), + retryCount=task.get("retryCount", 0), + retryMax=task.get("retryMax", 3), + rollbackOnFailure=task.get("rollbackOnFailure", True), + dependencies=task.get("dependencies", []), + feedback=task.get("feedback") + ) + except Exception as e: + logger.error(f"Error validating task data: {str(e)}") + return None + + def getWorkflowTasks(self, workflowId: str) -> List[TaskItem]: + """Returns tasks for a workflow if user has access to the workflow.""" + # Check workflow access first + workflow = self.getWorkflow(workflowId) + if not workflow: + return [] + + # Get tasks for this workflow + tasks = self.db.getRecordset("tasks", recordFilter={"workflowId": workflowId}) + return [TaskItem(**task) for task in self._uam("tasks", tasks)] + + def createTask(self, taskData: Dict[str, Any]) -> TaskItem: + """Creates a new task if user has access to the workflow.""" + try: + # Check workflow access + workflowId = taskData.get("workflowId") + if not workflowId: + logger.error("No workflowId provided for createTask") + return None + + workflow = self.getWorkflow(workflowId) + if not workflow: + logger.warning(f"No access to workflow {workflowId}") + return None + + if not self._canModify("workflows", workflowId): + logger.warning(f"No permission to modify workflow {workflowId}") + return None + + # Ensure required fields + if "id" not in taskData: + taskData["id"] = f"task_{uuid.uuid4()}" + + if "status" not in taskData: + taskData["status"] = TaskStatus.PENDING + + if "startedAt" not in taskData: + taskData["startedAt"] = self._getCurrentTimestamp() + + # Create task in database + createdTask = self.db.recordCreate("tasks", taskData) + + # Convert to TaskItem model + task = TaskItem( + id=createdTask["id"], + workflowId=createdTask["workflowId"], + status=createdTask.get("status", TaskStatus.PENDING), + error=createdTask.get("error"), + startedAt=createdTask.get("startedAt"), + finishedAt=createdTask.get("finishedAt"), + actionList=[TaskAction(**action) for action in createdTask.get("actionList", [])], + documentsOutput=createdTask.get("documentsOutput", []), + retryCount=createdTask.get("retryCount", 0), + retryMax=createdTask.get("retryMax", 3), + rollbackOnFailure=createdTask.get("rollbackOnFailure", True), + dependencies=createdTask.get("dependencies", []), + feedback=createdTask.get("feedback") + ) + + # Update workflow's task list + workflowTasks = workflow.get("tasks", []) + if task.id not in workflowTasks: + workflowTasks.append(task.id) + self.updateWorkflow(workflowId, {"tasks": workflowTasks}) + + return task + + except Exception as e: + logger.error(f"Error creating task: {str(e)}") + return None + + def updateTask(self, taskId: str, taskData: Dict[str, Any]) -> TaskItem: + """Updates a task if user has access to the workflow.""" + try: + # Get existing task + task = self.getTask(taskId) + if not task: + logger.warning(f"Task {taskId} not found") + return None + + # Check workflow access + workflow = self.getWorkflow(task.workflowId) + if not workflow: + logger.warning(f"No access to workflow {task.workflowId}") + return None + + if not self._canModify("workflows", task.workflowId): + logger.warning(f"No permission to modify workflow {task.workflowId}") + return None + + # Update task in database + updatedTask = self.db.recordModify("tasks", taskId, taskData) + + # Convert to TaskItem model + return TaskItem( + id=updatedTask["id"], + workflowId=updatedTask["workflowId"], + status=updatedTask.get("status", task.status), + error=updatedTask.get("error", task.error), + startedAt=updatedTask.get("startedAt", task.startedAt), + finishedAt=updatedTask.get("finishedAt", task.finishedAt), + actionList=[TaskAction(**action) for action in updatedTask.get("actionList", task.actionList)], + documentsOutput=updatedTask.get("documentsOutput", task.documentsOutput), + retryCount=updatedTask.get("retryCount", task.retryCount), + retryMax=updatedTask.get("retryMax", task.retryMax), + rollbackOnFailure=updatedTask.get("rollbackOnFailure", task.rollbackOnFailure), + dependencies=updatedTask.get("dependencies", task.dependencies), + feedback=updatedTask.get("feedback", task.feedback) + ) + + except Exception as e: + logger.error(f"Error updating task: {str(e)}") + return None + + def deleteTask(self, taskId: str) -> bool: + """Deletes a task if user has access to the workflow.""" + try: + # Get existing task + task = self.getTask(taskId) + if not task: + logger.warning(f"Task {taskId} not found") + return False + + # Check workflow access + workflow = self.getWorkflow(task.workflowId) + if not workflow: + logger.warning(f"No access to workflow {task.workflowId}") + return False + + if not self._canModify("workflows", task.workflowId): + logger.warning(f"No permission to modify workflow {task.workflowId}") + return False + + # Delete task + if self.db.recordDelete("tasks", taskId): + # Update workflow's task list + workflowTasks = workflow.get("tasks", []) + if taskId in workflowTasks: + workflowTasks.remove(taskId) + self.updateWorkflow(task.workflowId, {"tasks": workflowTasks}) + return True + return False + + except Exception as e: + logger.error(f"Error deleting task: {str(e)}") + return False def getInterface(currentUser: Optional[User] = None) -> 'ChatInterface': """ diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py index 1d65d26e..f81a882b 100644 --- a/modules/interfaces/serviceChatModel.py +++ b/modules/interfaces/serviceChatModel.py @@ -3,13 +3,36 @@ Chat model classes for the chat system. """ from pydantic import BaseModel, Field -from typing import List, Dict, Any, Optional, Union +from typing import List, Dict, Any, Optional from datetime import datetime, UTC import uuid from enum import Enum from modules.shared.attributeUtils import register_model_labels, ModelMixin +# ===== Method Models ===== + +class MethodResult(BaseModel, ModelMixin): + """Model for method results""" + success: bool = Field(description="Whether the method execution was successful") + data: Dict[str, Any] = Field(description="Result data") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") + validation: List[str] = Field(default_factory=list, description="Validation messages") + error: Optional[str] = Field(None, description="Error message if any") + +# Register labels for MethodResult +register_model_labels( + "MethodResult", + {"en": "Method Result", "fr": "Résultat de méthode"}, + { + "success": {"en": "Success", "fr": "Succès"}, + "data": {"en": "Data", "fr": "Données"}, + "metadata": {"en": "Metadata", "fr": "Métadonnées"}, + "validation": {"en": "Validation", "fr": "Validation"}, + "error": {"en": "Error", "fr": "Erreur"} + } +) + # ===== Base Enums and Simple Models ===== class TaskStatus(str, Enum): @@ -121,31 +144,9 @@ register_model_labels( } ) -class TaskDocument(BaseModel, ModelMixin): - """Data model for a task document""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - data: str = Field(description="Base64 encoded file data") - filename: str = Field(description="Name of the file") - fileSize: int = Field(description="Size of the file") - mimeType: str = Field(description="MIME type of the file") - -# Register labels for TaskDocument -register_model_labels( - "TaskDocument", - {"en": "Task Document", "fr": "Document de tâche"}, - { - "id": {"en": "ID", "fr": "ID"}, - "filename": {"en": "Filename", "fr": "Nom de fichier"}, - "fileSize": {"en": "File Size", "fr": "Taille du fichier"}, - "mimeType": {"en": "MIME Type", "fr": "Type MIME"}, - "data": {"en": "Data", "fr": "Données"} - } -) - class ExtractedContent(BaseModel, ModelMixin): """Data model for extracted content""" - objectId: str = Field(description="Reference to source document") - objectType: str = Field(description="Type of source object ('ChatDocument' or 'TaskDocument')") + id: str = Field(description="Reference to source ChatDocument") contents: List[ContentItem] = Field(default_factory=list, description="List of content items") # Register labels for ExtractedContent @@ -161,6 +162,61 @@ register_model_labels( # ===== Task Models ===== +class TaskAction(BaseModel, ModelMixin): + """Model for task actions""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Unique action identifier") + method: str = Field(..., description="Method to execute") + action: str = Field(..., description="Action to perform") + parameters: Dict[str, Any] = Field(default_factory=dict, description="Action parameters") + status: TaskStatus = Field(default=TaskStatus.PENDING, description="Current action status") + retryCount: int = Field(default=0, description="Number of retry attempts") + retryMax: int = Field(default=3, description="Maximum number of retry attempts") + error: Optional[str] = Field(None, description="Error message if action failed") + startedAt: Optional[datetime] = Field(None, description="Action start timestamp") + finishedAt: Optional[datetime] = Field(None, description="Action completion timestamp") + + def start(self) -> None: + """Start the action""" + self.status = TaskStatus.RUNNING + self.startedAt = datetime.now(UTC) + + def complete(self) -> None: + """Mark action as completed""" + self.status = TaskStatus.COMPLETED + self.finishedAt = datetime.now(UTC) + + def fail(self, error: str) -> None: + """Mark action as failed""" + self.status = TaskStatus.FAILED + self.error = error + self.finishedAt = datetime.now(UTC) + + def canRetry(self) -> bool: + """Check if action can be retried""" + return self.retryCount < self.retryMax + + def incrementRetry(self) -> None: + """Increment retry count""" + self.retryCount += 1 + +# Register labels for TaskAction +register_model_labels( + "TaskAction", + {"en": "Task Action", "fr": "Action de tâche"}, + { + "id": {"en": "ID", "fr": "ID"}, + "method": {"en": "Method", "fr": "Méthode"}, + "action": {"en": "Action", "fr": "Action"}, + "parameters": {"en": "Parameters", "fr": "Paramètres"}, + "status": {"en": "Status", "fr": "Statut"}, + "retryCount": {"en": "Retry Count", "fr": "Nombre de tentatives"}, + "retryMax": {"en": "Max Retries", "fr": "Tentatives maximales"}, + "error": {"en": "Error", "fr": "Erreur"}, + "startedAt": {"en": "Started At", "fr": "Démarré le"}, + "finishedAt": {"en": "Finished At", "fr": "Terminé le"} + } +) + class TaskItem(BaseModel, ModelMixin): """Model for tasks""" id: str = Field(..., description="Unique task identifier") @@ -169,7 +225,7 @@ class TaskItem(BaseModel, ModelMixin): error: Optional[str] = Field(None, description="Error message if task failed") startedAt: Optional[datetime] = Field(None, description="Task start timestamp") finishedAt: Optional[datetime] = Field(None, description="Task completion timestamp") - actionList: List[Dict[str, Any]] = Field(default_factory=list, description="List of actions to execute") + actionList: List[TaskAction] = Field(default_factory=list, description="List of actions to execute") documentsOutput: List[Dict[str, Any]] = Field(default_factory=list, description="Output documents") retryCount: int = Field(default=0, description="Number of retry attempts") retryMax: int = Field(default=3, description="Maximum number of retry attempts") @@ -229,7 +285,7 @@ class TaskItem(BaseModel, ModelMixin): if taskId in self.dependencies: self.dependencies.remove(taskId) - def addAction(self, action: Dict[str, Any]) -> None: + def addAction(self, action: TaskAction) -> None: """Add an action to the task""" self.actionList.append(action) @@ -270,6 +326,7 @@ class TaskResult(BaseModel, ModelMixin): error: Optional[str] = Field(None, description="Error message if task failed") data: Optional[Dict[str, Any]] = Field(None, description="Result data") documents: List[ChatDocument] = Field(default_factory=list, description="Output documents") + documentsLabel: Optional[str] = Field(None, description="Label for the set of documents") feedback: Optional[str] = Field(None, description="Task feedback message") processingTime: Optional[float] = Field(None, description="Processing time in seconds") timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC), description="When the result was created") diff --git a/modules/interfaces/serviceManagementAccess.py b/modules/interfaces/serviceManagementAccess.py index f52a253d..ef842b2f 100644 --- a/modules/interfaces/serviceManagementAccess.py +++ b/modules/interfaces/serviceManagementAccess.py @@ -153,7 +153,7 @@ class ManagementAccess: # For regular users and admins, check specific cases if recordId is not None: # Get the record to check ownership - records = self.db.getRecordset(table, recordFilter={"id": recordId}) + records: List[Dict[str, Any]] = self.db.getRecordset(table, recordFilter={"id": recordId}) if not records: return False diff --git a/modules/interfaces/serviceManagementClass.py b/modules/interfaces/serviceManagementClass.py index bf97d845..e478ac18 100644 --- a/modules/interfaces/serviceManagementClass.py +++ b/modules/interfaces/serviceManagementClass.py @@ -6,14 +6,14 @@ Uses the JSON connector for data access with added language support. import os import logging import uuid -from datetime import datetime +from datetime import datetime, UTC from typing import Dict, Any, List, Optional, Union import hashlib from modules.interfaces.serviceManagementAccess import ManagementAccess from modules.interfaces.serviceManagementModel import ( - Prompt, FileItem, FileData + FilePreview, Prompt, FileItem, FileData ) from modules.interfaces.serviceAppModel import User, Mandate, UserPrivilege @@ -376,6 +376,47 @@ class ServiceManagement: } return extensionToMime.get(ext.lower(), "application/octet-stream") + def isTextMimeType(self, mimeType: str) -> bool: + """Determines if a MIME type represents a text-based format.""" + textMimeTypes = { + 'text/plain', + 'text/html', + 'text/css', + 'text/javascript', + 'text/x-python', + 'text/csv', + 'text/xml', + 'application/json', + 'application/xml', + 'application/javascript', + 'application/x-python', + 'application/x-httpd-php', + 'application/x-sh', + 'application/x-shellscript', + 'application/x-yaml', + 'application/x-toml', + 'application/x-markdown', + 'application/x-latex', + 'application/x-tex', + 'application/x-rst', + 'application/x-asciidoc', + 'application/x-markdown', + 'application/x-httpd-php', + 'application/x-httpd-php-source', + 'application/x-httpd-php3', + 'application/x-httpd-php4', + 'application/x-httpd-php5', + 'application/x-httpd-php7', + 'application/x-httpd-php8', + 'application/x-httpd-php-source', + 'application/x-httpd-php3-source', + 'application/x-httpd-php4-source', + 'application/x-httpd-php5-source', + 'application/x-httpd-php7-source', + 'application/x-httpd-php8-source' + } + return mimeType.lower() in textMimeTypes + # File methods - metadata-based operations def getAllFiles(self) -> List[FileItem]: @@ -440,15 +481,47 @@ class ServiceManagement: logger.error(f"Error converting file record: {str(e)}") return None + def _isFilenameUnique(self, filename: str, excludeFileId: Optional[str] = None) -> bool: + """Checks if a filename is unique for the current user.""" + # Get all files for current user + files = self.db.getRecordset("files", recordFilter={ + "_createdBy": self.currentUser.id + }) + + # Check if filename exists (excluding the current file if updating) + for file in files: + if file["filename"] == filename and (excludeFileId is None or file["id"] != excludeFileId): + return False + return True + + def _generateUniqueFilename(self, filename: str, excludeFileId: Optional[str] = None) -> str: + """Generates a unique filename by adding a number if necessary.""" + if self._isFilenameUnique(filename, excludeFileId): + return filename + + # Split filename into name and extension + name, ext = os.path.splitext(filename) + counter = 1 + + # Try filenames with increasing numbers until we find a unique one + while True: + newFilename = f"{name}_{counter}{ext}" + if self._isFilenameUnique(newFilename, excludeFileId): + return newFilename + counter += 1 + def createFile(self, name: str, mimeType: str, size: int = None, fileHash: str = None) -> FileItem: """Creates a new file entry if user has permission.""" if not self._canModify("files"): raise PermissionError("No permission to create files") + # Ensure filename is unique + uniqueName = self._generateUniqueFilename(name) + # Create FileItem instance fileItem = FileItem( mandateId=self.currentUser.mandateId, - filename=name, + filename=uniqueName, mimeType=mimeType, fileSize=size, fileHash=fileHash @@ -468,6 +541,10 @@ class ServiceManagement: if not self._canModify("files", fileId): raise PermissionError(f"No permission to update file {fileId}") + # If filename is being updated, ensure it's unique + if "filename" in updateData: + updateData["filename"] = self._generateUniqueFilename(updateData["filename"], fileId) + # Update file return self.db.recordModify("files", fileId, updateData) @@ -525,7 +602,7 @@ class ServiceManagement: # Determine if this is a text-based format mimeType = file.mimeType - isTextFormat = isTextMimeType(mimeType) + isTextFormat = self.isTextMimeType(mimeType) base64Encoded = False fileData = None @@ -598,8 +675,8 @@ class ServiceManagement: logger.error(f"Error processing file data for {fileId}: {str(e)}") return None - def getFilePreview(self, fileId: str) -> Optional[Dict[str, Any]]: - """Returns a preview of the file content if user has access.""" + def getFileContent(self, fileId: str) -> Optional[FilePreview]: + """Returns the full file content if user has access.""" try: # Get file metadata file = self.getFile(fileId) @@ -613,35 +690,39 @@ class ServiceManagement: logger.warning(f"No content found for file ID {fileId}") return None - # Determine if content is text based on MIME type - isText = file.mimeType.startswith(('text/', 'application/json', 'application/xml', 'application/javascript')) + # Process content based on file type + contentType = "binary" + content = "" - # For text content, decode to string - if isText: + if file.get("mimeType", "").startswith("text/"): + # For text files, return full content try: content = fileContent.decode('utf-8') - encoding = 'utf-8' + contentType = "text" except UnicodeDecodeError: - try: - content = fileContent.decode('latin-1') - encoding = 'latin-1' - except: - content = fileContent - encoding = None + content = fileContent.decode('latin-1') + contentType = "text" + elif file.get("mimeType", "").startswith("image/"): + # For images, return base64 + contentType = "base64" + content = f"data:{file['mimeType']};base64,{fileContent.hex()}" else: - content = fileContent - encoding = None + # For other files, return as base64 + contentType = "base64" + content = f"data:{file['mimeType']};base64,{fileContent.hex()}" + + return FilePreview( + id=fileId, + name=file.get("name", "Unknown"), + mimeType=file.get("mimeType", "application/octet-stream"), + size=file.get("size", 0), + content=content, + contentType=contentType, + metadata=file.get("metadata", {}) + ) - return { - "content": content, - "mimeType": file.mimeType, - "filename": file.filename, - "isText": isText, - "encoding": encoding, - "size": len(fileContent) - } except Exception as e: - logger.error(f"Error getting file preview for {fileId}: {str(e)}") + logger.error(f"Error getting file content: {str(e)}") return None def updateFileData(self, fileId: str, data: Union[bytes, str]) -> bool: @@ -661,7 +742,7 @@ class ServiceManagement: # Determine if this is a text-based format mimeType = file.mimeType - isTextFormat = isTextMimeType(mimeType) + isTextFormat = self.isTextMimeType(mimeType) base64Encoded = False fileData = None diff --git a/modules/methods/methodBase.py b/modules/methods/methodBase.py index 2eaa845f..7952b68e 100644 --- a/modules/methods/methodBase.py +++ b/modules/methods/methodBase.py @@ -3,32 +3,10 @@ from typing import Dict, List, Optional, Any, Literal from datetime import datetime, UTC from pydantic import BaseModel, Field import logging +from modules.interfaces.serviceChatModel import MethodResult logger = logging.getLogger(__name__) -class AuthSource(str, Enum): - """Authentication source enumeration""" - LOCAL = "local" - MSFT = "msft" - GOOGLE = "google" - # Add more auth sources as needed - -class MethodParameter(BaseModel): - """Model for method parameters""" - name: str - type: str - required: bool - validation: Optional[callable] = None - description: str - -class MethodResult(BaseModel): - """Model for method results""" - success: bool - data: Dict[str, Any] - metadata: Dict[str, Any] = Field(default_factory=dict) - validation: List[str] = Field(default_factory=list) - error: Optional[str] = Field(None, description="Error message if any") - class MethodBase: """Base class for all methods""" @@ -37,7 +15,6 @@ class MethodBase: self.service = serviceContainer self.name: str self.description: str - self.authSource: AuthSource = AuthSource.LOCAL # Default to local auth self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") @property @@ -123,16 +100,6 @@ class MethodBase: """Rollback specific action - to be implemented by subclasses""" pass - def _validateAuth(self, authData: Optional[Dict[str, Any]] = None) -> bool: - """Validate authentication data""" - try: - if self.authSource == AuthSource.LOCAL: - return True - return bool(authData and authData.get('source') == self.authSource) - except Exception as e: - self.logger.error(f"Error validating auth: {str(e)}") - return False - def _createResult(self, success: bool, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, error: Optional[str] = None) -> MethodResult: """Create a method result""" return MethodResult( diff --git a/modules/methods/methodCoder.py b/modules/methods/methodCoder.py index f9b06f21..7b9a1755 100644 --- a/modules/methods/methodCoder.py +++ b/modules/methods/methodCoder.py @@ -3,7 +3,7 @@ import logging import ast import re -from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.methods.methodBase import MethodBase, MethodResult logger = logging.getLogger(__name__) @@ -14,7 +14,6 @@ class MethodCoder(MethodBase): super().__init__() self.name = "coder" self.description = "Handle code operations like analysis, generation, and refactoring" - self.authSource = AuthSource.LOCAL # Code operations typically don't need auth @property def actions(self) -> Dict[str, Dict[str, Any]]: diff --git a/modules/methods/methodOperator.py b/modules/methods/methodOperator.py new file mode 100644 index 00000000..9082ccf5 --- /dev/null +++ b/modules/methods/methodOperator.py @@ -0,0 +1,178 @@ +from typing import Dict, List, Any, Optional +from datetime import datetime, UTC +import logging +from .methodBase import MethodBase +from modules.interfaces.serviceChatModel import MethodResult + +logger = logging.getLogger(__name__) + +class MethodOperator(MethodBase): + """Operator methods for handling collections and AI operations""" + + def __init__(self, serviceContainer: Any): + super().__init__(serviceContainer) + self.name = "operator" + self.description = "Operator methods for handling collections and AI operations" + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "forEach": { + "description": "Execute an action for each item in a list", + "parameters": { + "items": { + "type": "List[Any]", + "description": "List of items to process", + "required": True + }, + "action": { + "type": "Dict[str, Any]", + "description": "Action to execute for each item", + "required": True, + "properties": { + "method": {"type": "str", "required": True}, + "action": {"type": "str", "required": True}, + "parameters": {"type": "Dict[str, Any]", "required": False} + } + } + } + }, + "aiCall": { + "description": "Call AI service with document content", + "parameters": { + "prompt": { + "type": "str", + "description": "Prompt for AI processing", + "required": True + }, + "extractedDocumentContent": { + "type": "List[Dict[str, str]]", + "description": "List of documents and their extraction prompts", + "required": True, + "items": { + "type": "object", + "properties": { + "document": {"type": "str", "required": True}, + "promptForContentExtraction": {"type": "str", "required": True} + } + } + } + } + } + } + + async def _executeAction(self, action: str, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute operator action""" + if action == "forEach": + return await self._executeForEach(parameters) + elif action == "aiCall": + return await self._executeAiCall(parameters) + else: + raise ValueError(f"Unsupported action: {action}") + + async def _executeForEach(self, parameters: Dict[str, Any]) -> MethodResult: + """Execute forEach operation""" + try: + items = parameters.get("items", []) + action = parameters.get("action", {}) + + if not items or not action: + return self._createResult( + success=False, + data={}, + error="Missing required parameters" + ) + + results = [] + for item in items: + try: + # Execute action for each item + method = action.get("method") + action_name = action.get("action") + action_params = action.get("parameters", {}) + + # Add current item to parameters + action_params["item"] = item + + # Execute method action + method_result = await self.service.methods[method][action_name](action_params) + results.append(method_result) + + except Exception as e: + logger.error(f"Error processing item: {str(e)}") + results.append({ + "success": False, + "error": str(e) + }) + + return self._createResult( + success=True, + data={"results": results} + ) + + except Exception as e: + logger.error(f"Error in forEach execution: {str(e)}") + return self._createResult( + success=False, + data={}, + error=str(e) + ) + + async def _executeAiCall(self, parameters: Dict[str, Any]) -> MethodResult: + """Execute AI call with document content""" + try: + prompt = parameters.get("prompt") + documents = parameters.get("extractedDocumentContent", []) + + if not prompt: + return self._createResult( + success=False, + data={}, + error="Missing prompt parameter" + ) + + # Extract content from documents + extracted_content = [] + for doc in documents: + try: + doc_ref = doc.get("document") + doc_prompt = doc.get("promptForContentExtraction") + + if not doc_ref or not doc_prompt: + continue + + # Extract content using document manager + content = self.service.extractContent(doc_prompt, doc_ref) + extracted_content.append({ + "document": doc_ref, + "content": content + }) + + except Exception as e: + logger.error(f"Error extracting document content: {str(e)}") + continue + + # Prepare AI prompt with extracted content + full_prompt = f"{prompt}\n\nExtracted Content:\n" + for content in extracted_content: + full_prompt += f"\nDocument: {content['document']}\n{content['content']}\n" + + # Call AI service + response = await self.service.callAiBasic(full_prompt) + + return self._createResult( + success=True, + data={ + "response": response, + "processedDocuments": len(extracted_content) + } + ) + + except Exception as e: + logger.error(f"Error in AI call execution: {str(e)}") + return self._createResult( + success=False, + data={}, + error=str(e) + ) diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py index 8391ec96..68dd07d0 100644 --- a/modules/methods/methodOutlook.py +++ b/modules/methods/methodOutlook.py @@ -3,7 +3,7 @@ import logging from datetime import datetime, UTC from O365 import Account, MSGraphProtocol -from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.methods.methodBase import MethodBase, MethodResult from modules.models.userConnection import UserConnection logger = logging.getLogger(__name__) @@ -15,7 +15,6 @@ class MethodOutlook(MethodBase): super().__init__() self.name = "outlook" self.description = "Handle Outlook email operations like reading and sending emails" - self.authSource = AuthSource.MICROSOFT @property def actions(self) -> Dict[str, Dict[str, Any]]: diff --git a/modules/methods/methodPowerpoint.py b/modules/methods/methodPowerpoint.py index f85915fa..26ccf458 100644 --- a/modules/methods/methodPowerpoint.py +++ b/modules/methods/methodPowerpoint.py @@ -3,7 +3,7 @@ import logging import os from pathlib import Path -from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.methods.methodBase import MethodBase, MethodResult from modules.models.userConnection import UserConnection from modules.models.account import Account from modules.protocols.msGraphProtocol import MSGraphProtocol @@ -17,7 +17,6 @@ class MethodPowerpoint(MethodBase): super().__init__() self.name = "powerpoint" self.description = "Handle PowerPoint operations like reading, writing, and converting presentations" - self.authSource = AuthSource.MICROSOFT # PowerPoint operations need Microsoft auth @property def actions(self) -> Dict[str, Dict[str, Any]]: diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py index 06cd4ee8..e75fbd6f 100644 --- a/modules/methods/methodSharepoint.py +++ b/modules/methods/methodSharepoint.py @@ -7,7 +7,7 @@ from office365.sharepoint.files.file import File from office365.sharepoint.lists.list import List from office365.sharepoint.lists.list_creation_information import ListCreationInformation -from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.methods.methodBase import MethodBase, MethodResult from modules.models.userConnection import UserConnection logger = logging.getLogger(__name__) @@ -19,7 +19,6 @@ class MethodSharepoint(MethodBase): super().__init__() self.name = "sharepoint" self.description = "Handle SharePoint document operations like search, read, and write" - self.authSource = AuthSource.MICROSOFT @property def actions(self) -> Dict[str, Dict[str, Any]]: diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py index 437bca37..9398741c 100644 --- a/modules/methods/methodWeb.py +++ b/modules/methods/methodWeb.py @@ -10,7 +10,7 @@ import requests import time import json -from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.methods.methodBase import MethodBase, MethodResult from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) @@ -22,7 +22,6 @@ class MethodWeb(MethodBase): super().__init__() self.name = "web" self.description = "Handle web operations like search, crawl, and content extraction" - self.auth_source = AuthSource.LOCAL # Web operations typically don't need auth # Web crawling configuration from agentWebcrawler self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "") diff --git a/modules/shared/attributeUtils.py b/modules/shared/attributeUtils.py index 6382592f..f6b0e291 100644 --- a/modules/shared/attributeUtils.py +++ b/modules/shared/attributeUtils.py @@ -23,9 +23,9 @@ class ModelMixin: """ # Get the raw dictionary if hasattr(self, 'model_dump'): - data = self.model_dump() # Pydantic v2 + data: Dict[str, Any] = self.model_dump() # Pydantic v2 else: - data = self.dict() # Pydantic v1 + data: Dict[str, Any] = self.dict() # Pydantic v1 # Convert datetime fields to ISO format strings for key, value in data.items(): diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py index 90d07904..fbc2429e 100644 --- a/modules/workflow/managerChat.py +++ b/modules/workflow/managerChat.py @@ -1,22 +1,14 @@ import logging -import importlib -import pkgutil -import inspect -from typing import Dict, Any, Optional, List, Type, Callable, Awaitable, Union +from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC import json -import base64 import uuid -from modules.interfaces.serviceAppClass import User -from modules.methods.methodBase import MethodBase, AuthSource, MethodResult -from modules.workflow.serviceContainer import ServiceContainer +from modules.interfaces.serviceAppModel import User from modules.interfaces.serviceChatModel import ( - TaskStatus, UserInputRequest, ContentMetadata, ContentItem, - ChatDocument, TaskDocument, ExtractedContent, TaskItem, - TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow + TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) -from modules.workflow.processorDocument import DocumentProcessor +from modules.workflow.serviceContainer import ServiceContainer logger = logging.getLogger(__name__) @@ -24,68 +16,21 @@ class ChatManager: """Chat manager with improved AI integration and method handling""" def __init__(self, currentUser: User): - self._discoverMethods() - self.workflow: Optional[ChatWorkflow] = None - self.currentTask: Optional[TaskItem] = None - self.workflowHistory: List[ChatMessage] = [] - self.documentProcessor = DocumentProcessor() - self.userLanguage = None self.currentUser = currentUser + self.service: ServiceContainer = None # ===== Initialization and Setup ===== async def initialize(self, workflow: ChatWorkflow) -> None: """Initialize chat manager with workflow""" - self.service.workflow = workflow - - # Initialize AI model - self.service.model = { - 'callAiBasic': self._callAiBasic, - 'callAiAdvanced': self._callAiAdvanced - } - - # Initialize document processor - self.service.documentProcessor.initialize() + self.workflow = workflow + self.service = ServiceContainer(self.currentUser, self.workflow) - def setUserLanguage(self, languageCode: str): - """Set the user's preferred language""" - self.userLanguage = languageCode - logger.debug(f"User language set to: {languageCode}") - - def _discoverMethods(self): - """Dynamically discover all method classes in modules.methods package""" - try: - # Import the methods package - methodsPackage = importlib.import_module('modules.methods') - - # Discover all modules in the package - for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): - if not isPkg and name.startswith('method'): - try: - # Import the module - module = importlib.import_module(f'modules.methods.{name}') - - # Find all classes in the module that inherit from MethodBase - for itemName, item in inspect.getmembers(module): - if (inspect.isclass(item) and - issubclass(item, MethodBase) and - item != MethodBase): - # Instantiate the method and add to service - methodInstance = item() - self.service.methods[methodInstance.name] = methodInstance - logger.info(f"Discovered method: {methodInstance.name}") - - except Exception as e: - logger.error(f"Error loading method module {name}: {str(e)}") - - except Exception as e: - logger.error(f"Error discovering methods: {str(e)}") - # ===== Task Creation and Management ===== async def createInitialTask(self, workflow: ChatWorkflow, initialMessage: ChatMessage) -> Optional[TaskItem]: """Create the initial task from the first message""" try: # Get available methods and their actions - methodCatalog = self.service.getAvailableMethods() + methodCatalog = self.service.getMethodsCatalog() # Process user input with AI processedInput = await self._processUserInput(initialMessage.message, methodCatalog) @@ -93,21 +38,19 @@ class ChatManager: # Create actions from processed input actions = await self._createActions(processedInput['actions']) - # Create task - task = TaskItem( - id=f"task_{datetime.now(UTC).timestamp()}", - workflowId=workflow.id, - userInput=processedInput['objective'], - dataList=initialMessage.documents, - actionList=actions, - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) + # Create task data + taskData = { + "workflowId": workflow.id, + "userInput": processedInput['objective'], + "dataList": initialMessage.documents, + "actionList": [action.dict() for action in actions], + "status": TaskStatus.PENDING, + "startedAt": datetime.now(UTC).isoformat(), + "updatedAt": datetime.now(UTC).isoformat() + } - # Add task to workflow - workflow.tasks.append(task) - return task + # Create task using ChatInterface + return self.service.createTask(taskData) except Exception as e: logger.error(f"Error creating initial task: {str(e)}") @@ -127,20 +70,18 @@ class ChatManager: logger.error("No task data in previous result") return None - # Create next task - nextTask = TaskItem( - id=f"task_{datetime.now(UTC).timestamp()}", - workflowId=workflow.id, - userInput=taskData.get('objective', ''), - actionList=await self._createActions(taskData.get('actions', [])), - status=TaskStatus.PENDING, - createdAt=datetime.now(UTC), - updatedAt=datetime.now(UTC) - ) + # Create task data + taskData = { + "workflowId": workflow.id, + "userInput": taskData.get('objective', ''), + "actionList": [action.dict() for action in await self._createActions(taskData.get('actions', []))], + "status": TaskStatus.PENDING, + "startedAt": datetime.now(UTC).isoformat(), + "updatedAt": datetime.now(UTC).isoformat() + } - # Add task to workflow - workflow.tasks.append(nextTask) - return nextTask + # Create task using ChatInterface + return self.service.createTask(taskData) except Exception as e: logger.error(f"Error creating next task: {str(e)}") @@ -163,7 +104,7 @@ class ChatManager: """ # Get AI response - response = await self._callAiBasic(prompt) + response = await self.service.callAiBasic(prompt) # Parse response try: @@ -173,7 +114,8 @@ class ChatManager: status=TaskStatus.COMPLETED, success=True, timestamp=datetime.now(UTC), - data=result + data=result, + documentsLabel="Task Results" ) except json.JSONDecodeError as e: logger.error(f"Error parsing AI response: {str(e)}") @@ -182,7 +124,8 @@ class ChatManager: status=TaskStatus.FAILED, success=False, timestamp=datetime.now(UTC), - error=f"Error parsing AI response: {str(e)}" + error=f"Error parsing AI response: {str(e)}", + documentsLabel="Task Error" ) except Exception as e: @@ -192,58 +135,10 @@ class ChatManager: status=TaskStatus.FAILED, success=False, timestamp=datetime.now(UTC), - error=f"Error identifying next task: {str(e)}" + error=f"Error identifying next task: {str(e)}", + documentsLabel="Task Error" ) - async def callAi(self, messages: List[Dict[str, str]], produceUserAnswer: bool = False, temperature: float = None) -> str: - """Enhanced AI service call with language support.""" - if not self.service or not self.service.base: - logger.error("AI service not set in ChatManager") - return "Error: AI service not available" - - # Add language instruction for user-facing responses - if produceUserAnswer and self.userLanguage: - ltext = f"Please respond in '{self.userLanguage}' language." - if messages and messages[0]["role"] == "system": - if "language" not in messages[0]["content"].lower(): - messages[0]["content"] = f"{ltext} {messages[0]['content']}" - else: - # Insert a system message with language instruction - messages.insert(0, { - "role": "system", - "content": ltext - }) - - # Call the AI service - if temperature is not None: - return await self.service.base.callAi(messages, temperature=temperature) - else: - return await self.service.base.callAi(messages) - - async def callAi4Image(self, imageData: Union[str, bytes], mimeType: str = None, prompt: str = "Describe this image") -> str: - """Enhanced AI service call with language support.""" - if not self.service or not self.service.base: - logger.error("AI service not set in ChatManager") - return "Error: AI service not available" - return await self.service.base.analyzeImage(imageData, mimeType, prompt) - - async def _callAiBasic(self, prompt: str) -> str: - """Call basic AI service""" - try: - if not self.service or not self.service.base: - raise ValueError("Service or base interface not initialized") - return await self.callAi([ - {"role": "system", "content": "You are an AI assistant that helps process user requests."}, - {"role": "user", "content": prompt} - ]) - except Exception as e: - logger.error(f"Error calling AI service: {str(e)}") - raise - - async def _callAiAdvanced(self, prompt: str, context: Dict[str, Any]) -> str: - """Call advanced AI model with context""" - # TODO: Implement actual AI call - return "AI response placeholder" async def generateWorkflowFeedback(self, workflow: ChatWorkflow) -> str: """ @@ -264,9 +159,6 @@ class ChatManager: "duration": (datetime.now(UTC) - datetime.fromisoformat(workflow.startedAt)).total_seconds() } - # Get user language from workflow mandate - userLanguage = workflow.mandateId.split('_')[0] if workflow.mandateId else 'en' - self.setUserLanguage(userLanguage) # Prepare messages for AI context messages = [ @@ -377,9 +269,22 @@ Document: {document.filename} ({document.mimeType}) # Create output documents result.documents = await self._createOutputDocuments(task) + # Set documents label based on task input + result.documentsLabel = "TaskResult" else: result.feedback = f"Task failed: {result.error}" + # Update task in database + self.service.updateTask(task.id, { + "status": result.status, + "error": result.error, + "finishedAt": datetime.now(UTC).isoformat(), + "actionList": [action.dict() for action in task.actionList], + "documentsOutput": result.documents, + "feedback": result.feedback, + "documentsLabel": result.documentsLabel + }) + return result except Exception as e: @@ -390,15 +295,17 @@ Document: {document.filename} ({document.mimeType}) """Process and store task result in workflow""" try: # Find task in workflow - task = next((t for t in workflow.tasks if t.id == result.taskId), None) + task = self.service.getTask(result.taskId) if not task: logger.error(f"Task {result.taskId} not found in workflow") return # Update task status - task.status = result.status - if result.error: - task.error = result.error + self.service.updateTask(task.id, { + "status": result.status, + "error": result.error, + "finishedAt": datetime.now(UTC).isoformat() + }) # Create feedback message if available if result.feedback: @@ -410,13 +317,14 @@ Document: {document.filename} ({document.mimeType}) status="step", documents=result.documents ) - workflow.messages.append(message) + self.service.createWorkflowMessage(message.dict()) # Update workflow stats if result.processingTime: if not workflow.stats: workflow.stats = ChatStat() workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime + self.service.updateWorkflow(workflow.id, {"stats": workflow.stats.dict()}) except Exception as e: logger.error(f"Error parsing task result: {str(e)}") @@ -429,13 +337,16 @@ Document: {document.filename} ({document.mimeType}) if workflow.status in ["completed", "failed", "stopped"]: return False + # Get all tasks for the workflow + tasks = self.service.tasks + # Check if there are any pending tasks - hasPendingTasks = any(t.status == TaskStatus.PENDING for t in workflow.tasks) + hasPendingTasks = any(t.status == TaskStatus.PENDING for t in tasks) if not hasPendingTasks: return False # Check if any task is currently running - hasRunningTasks = any(t.status == TaskStatus.RUNNING for t in workflow.tasks) + hasRunningTasks = any(t.status == TaskStatus.RUNNING for t in tasks) if hasRunningTasks: return True @@ -444,7 +355,7 @@ Document: {document.filename} ({document.mimeType}) except Exception as e: logger.error(f"Error checking workflow continuation: {str(e)}") return False - + async def _summarizeWorkflow(self) -> str: """Summarize workflow history""" if not self.workflow.messages: @@ -460,7 +371,7 @@ Document: {document.filename} ({document.mimeType}) 4. Any issues or blockers """ - return await self._callAiBasic(prompt) + return await self.service.callAiBasic(prompt) async def _analyzeTaskResults(self, task: TaskItem) -> Dict[str, Any]: """Analyze task results to determine next steps""" @@ -469,8 +380,8 @@ Document: {document.filename} ({document.mimeType}) # Generate prompt for analysis prompt = f"""Based on the workflow summary and task results: - {summary} - +{summary} + Task: {task.userInput} Status: {task.status} Error: {task.error if task.error else 'None'} @@ -483,7 +394,7 @@ Document: {document.filename} ({document.mimeType}) """ # Get AI response - response = await self._callAiBasic(prompt) + response = await self.service.callAiBasic(prompt) # Parse response return json.loads(response) @@ -548,10 +459,10 @@ User Input: {userInput.get('message', '')} {self._promptInstructions(methodCatalog, isInitialTask=True)}""" # Call AI service - response = await self._callAiBasic(prompt) + response = await self.service.callAiBasic(prompt) return json.loads(response) - async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[TaskItem]: + async def _createActions(self, actionsData: List[Dict[str, Any]]) -> List[TaskAction]: """Create action objects from processed input""" actions = [] for actionData in actionsData: @@ -561,7 +472,7 @@ User Input: {userInput.get('message', '')} logger.warning(f"Skipping invalid action data: {actionData}") continue - action = TaskItem( + action = TaskAction( id=f"action_{datetime.now(UTC).timestamp()}", method=actionData['method'], action=actionData['action'], @@ -585,9 +496,9 @@ User Input: {userInput.get('message', '')} docPrompt = self._generateDocumentPrompt(task.userInput) # Get AI response for document generation - docResponse = await self._callAiBasic(docPrompt) + docResponse = await self.service.callAiBasic(docPrompt) - # Parse response into TaskDocument objects + # Parse response into Task-Document objects try: taskDocs = json.loads(docResponse) task.documentsOutput = taskDocs @@ -596,7 +507,7 @@ User Input: {userInput.get('message', '')} return f"Error processing results: {str(e)}" # Generate feedback - feedback = await self._callAiBasic( + feedback = await self.service.callAiBasic( f"""Generate feedback for the completed task: Task: {task.userInput} Generated Documents: {len(task.documentsOutput)} files @@ -616,24 +527,15 @@ User Input: {userInput.get('message', '')} try: fileIds = [] - # Process each TaskDocument from AI output + # Process each Task-Document from AI output for taskDoc in task.documentsOutput: # Store file in database - fileItem = self.service.functions.createFile( - name=taskDoc.filename, - mimeType=taskDoc.mimeType + fileItem = self.service.createFile( + fileName=taskDoc.filename, + mimeType=taskDoc.mimeType, + content=taskDoc.data, + base64Encoded=taskDoc.base64Encoded ) - - # Store file content - if taskDoc.base64Encoded: - # Decode base64 content - content = base64.b64decode(taskDoc.data) - else: - # Use text content directly - content = taskDoc.data.encode('utf-8') - - # Store file data - self.service.functions.createFileData(fileItem.id, content) fileIds.append(fileItem.id) # Convert all files to ChatDocuments in one call @@ -650,8 +552,8 @@ User Input: {userInput.get('message', '')} documents = [] for fileId in fileIds: # Get file metadata - fileMetadata = self.service.functions.getFile(fileId) - if not fileMetadata: + fileInfo = self.service.getFileInfo(fileId) + if not fileInfo: logger.warning(f"File metadata not found for {fileId}") continue @@ -659,48 +561,14 @@ User Input: {userInput.get('message', '')} document = ChatDocument( id=str(uuid.uuid4()), fileId=fileId, - filename=fileMetadata.get("name", "Unknown"), - fileSize=fileMetadata.get("size", 0), - mimeType=fileMetadata.get("mimeType", "text/plain") + filename=fileInfo.get("name", "Unknown"), + fileSize=fileInfo.get("size", 0), + mimeType=fileInfo.get("mimeType", "text/plain") ) documents.append(document) return documents - async def addTaskResult(self, workflow: ChatWorkflow, result: TaskResult) -> None: - """Add task result to workflow and update status""" - try: - # Find task in workflow - task = next((t for t in workflow.tasks if t.id == result.taskId), None) - if not task: - logger.error(f"Task {result.taskId} not found in workflow") - return - - # Update task status - task.status = result.status - if result.error: - task.error = result.error - - # Create feedback message if available - if result.feedback: - message = ChatMessage( - id=str(uuid.uuid4()), - workflowId=workflow.id, - role="assistant", - message=result.feedback, - status="step", - documents=result.documents - ) - workflow.messages.append(message) - - # Update workflow stats - if result.processingTime: - if not workflow.stats: - workflow.stats = ChatStat() - workflow.stats.processingTime = (workflow.stats.processingTime or 0) + result.processingTime - - except Exception as e: - logger.error(f"Error adding task result: {str(e)}") def _generateDocumentPrompt(self, task: str) -> str: """Generate a prompt for document generation""" @@ -708,7 +576,7 @@ User Input: {userInput.get('message', '')} Task: {task} -For each document you need to generate, provide a TaskDocument object with the following structure: +For each document you need to generate, provide a Document object with the following structure: {{ "filename": "string", # Filename with extension "mimeType": "string", # MIME type of the file @@ -722,5 +590,5 @@ Rules: 3. Use appropriate MIME types (e.g., text/plain, image/jpeg, application/pdf) 4. Include file extensions in filenames -Return a JSON array of TaskDocument objects. +Return a JSON array of Document objects. """ \ No newline at end of file diff --git a/modules/workflow/managerDocument.py b/modules/workflow/managerDocument.py index 30cb8194..1e461c0a 100644 --- a/modules/workflow/managerDocument.py +++ b/modules/workflow/managerDocument.py @@ -8,10 +8,7 @@ import uuid from modules.interfaces.serviceChatModel import ( ChatDocument, - TaskDocument, - ExtractedContent, - ContentItem, - ContentMetadata + ExtractedContent ) from modules.workflow.serviceContainer import ServiceContainer from modules.workflow.processorDocument import DocumentProcessor @@ -23,82 +20,12 @@ class DocumentManager: def __init__(self, serviceContainer: ServiceContainer): self.service = serviceContainer - self._processor = DocumentProcessor() + self._processor = DocumentProcessor(serviceContainer) - async def extractFromChatDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: - """ - Extract content from a ChatDocument with AI processing. - - Args: - prompt: Prompt for AI content extraction - document: The ChatDocument to process - - Returns: - ExtractedContent containing the processed content - """ - # Convert ChatDocument to TaskDocument - taskDoc = await self._convertToTaskDocument(document) - - # Process document using processor - extractedContent = await self._processor.processDocument(taskDoc, prompt) - - # Update the objectId and objectType to reference the original ChatDocument - extractedContent.objectId = document.id - extractedContent.objectType = "ChatDocument" - - return extractedContent - - async def extractFromTaskDocument(self, prompt: str, document: TaskDocument) -> ExtractedContent: - """ - Extract content directly from a task document. - - Args: - prompt: The prompt to use for content extraction - document: The task document to extract content from - - Returns: - ExtractedContent containing the processed content - - Raises: - ValueError: If document is invalid - IOError: If file cannot be read - """ + async def extractContent(self, prompt: str, document: ChatDocument) -> ExtractedContent: + """Extract content from document using prompt""" try: return await self._processor.processDocument(document, prompt) except Exception as e: - logger.error(f"Error extracting from task document: {str(e)}") - raise - - async def _convertToTaskDocument(self, chatDoc: ChatDocument) -> TaskDocument: - """ - Convert a ChatDocument to a TaskDocument. - - Args: - chatDoc: The chat document to convert - - Returns: - TaskDocument containing the converted data - - Raises: - ValueError: If document is invalid - IOError: If file cannot be read - """ - try: - # Get file content - fileContent = await self.service.functions.getFileData(chatDoc.fileId) - if not fileContent: - raise ValueError(f"Could not get content for file {chatDoc.fileId}") - - # Convert to base64 - base64Data = base64.b64encode(fileContent).decode('utf-8') - - return TaskDocument( - id=str(uuid.uuid4()), - filename=chatDoc.filename, - fileSize=chatDoc.fileSize, - mimeType=chatDoc.mimeType, - data=base64Data - ) - except Exception as e: - logger.error(f"Error converting chat document to task document: {str(e)}") + logger.error(f"Error extracting from document: {str(e)}") raise diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py index b0dcedf8..40408ab8 100644 --- a/modules/workflow/managerWorkflow.py +++ b/modules/workflow/managerWorkflow.py @@ -5,11 +5,7 @@ import uuid from modules.interfaces.serviceAppClass import User -from modules.interfaces.serviceChatModel import ( - TaskStatus, UserInputRequest, ContentMetadata, ContentItem, - ChatDocument, TaskDocument, ExtractedContent, TaskItem, - TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow -) +from modules.interfaces.serviceChatModel import (UserInputRequest, ChatMessage, ChatWorkflow) from modules.interfaces.serviceChatClass import ChatInterface from modules.workflow.managerChat import ChatManager diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py index 51b02c67..75565874 100644 --- a/modules/workflow/processorDocument.py +++ b/modules/workflow/processorDocument.py @@ -11,15 +11,13 @@ from bs4 import BeautifulSoup from modules.interfaces.serviceChatModel import ( ChatDocument, - TaskDocument, ExtractedContent, ContentItem, ContentMetadata ) -from modules.interfaces.serviceManagementClass import ServiceManagement, getInterface -from modules.interfaces.serviceAppModel import User from modules.neutralizer.neutralizer import DataAnonymizer from modules.shared.configuration import APP_CONFIG +from modules.workflow.serviceContainer import ServiceContainer logger = logging.getLogger(__name__) @@ -35,14 +33,12 @@ class FileProcessingError(Exception): class DocumentProcessor: """Processor for handling document operations and content extraction.""" - def __init__(self, currentUser: Optional[User] = None): + def __init__(self, serviceContainer: ServiceContainer): """Initialize the document processor.""" - - self.serviceManagement = getInterface(currentUser) - + self.service = serviceContainer self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None - self.supportedTypes: Dict[str, Callable[[Union[ChatDocument, TaskDocument]], Awaitable[List[ContentItem]]]] = { + self.supportedTypes: Dict[str, Callable[[ChatDocument], Awaitable[List[ContentItem]]]] = { 'text/plain': self._processText, 'text/csv': self._processCsv, 'application/json': self._processJson, @@ -115,7 +111,18 @@ class DocumentProcessor: except ImportError as e: logger.warning(f"Image processing libraries could not be loaded: {e}") - async def processDocument(self, document: TaskDocument, prompt: str) -> ExtractedContent: + async def _getFileData(self, document: ChatDocument) -> bytes: + """Centralized function to get file data""" + try: + fileData = self.service.getFileData(document.fileId) + if fileData is None: + raise FileProcessingError(f"Could not get file data for {document.fileId}") + return fileData + except Exception as e: + logger.error(f"Error getting file data: {str(e)}") + raise FileProcessingError(f"Failed to get file data: {str(e)}") + + async def processDocument(self, document: ChatDocument, prompt: str) -> ExtractedContent: """ Process a document and extract its contents with AI processing. @@ -149,15 +156,13 @@ class DocumentProcessor: try: # Process each content item with AI processedItems = await self._aiDataExtraction(contentItems, prompt) - contentItems = processedItems - except Exception as e: logger.error(f"Error processing content with AI: {str(e)}") return ExtractedContent( objectId=document.id, - objectType="TaskDocument", + objectType="ChatDocument", contents=contentItems ) @@ -165,7 +170,7 @@ class DocumentProcessor: logger.error(f"Error processing document: {str(e)}") raise FileProcessingError(f"Failed to process document: {str(e)}") - def _detectContentType(self, document: Union[ChatDocument, TaskDocument]) -> str: + def _detectContentType(self, document: ChatDocument) -> str: """Detect content type from file content""" try: # Check file extension first @@ -195,32 +200,16 @@ class DocumentProcessor: if ext in extToMime: return extToMime[ext] - # Try to detect if it's text content - if isinstance(document, TaskDocument): - try: - content = base64.b64decode(document.data) - content.decode('utf-8') - return 'text/plain' - except UnicodeDecodeError: - pass - return 'application/octet-stream' except Exception as e: logger.error(f"Error detecting content type: {str(e)}") return 'application/octet-stream' - async def _processText(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processText(self, document: ChatDocument) -> List[ContentItem]: """Process text document""" try: - if isinstance(document, TaskDocument): - content = base64.b64decode(document.data).decode('utf-8') - else: - content = self.serviceManagement.getFileData(document.fileId) - if content is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") - content = content.decode('utf-8') - + content = (await self._getFileData(document)).decode('utf-8') return [ContentItem( label="main", data=content, @@ -235,17 +224,10 @@ class DocumentProcessor: logger.error(f"Error processing text document: {str(e)}") raise FileProcessingError(f"Failed to process text document: {str(e)}") - async def _processCsv(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processCsv(self, document: ChatDocument) -> List[ContentItem]: """Process CSV document""" try: - if isinstance(document, TaskDocument): - content = base64.b64decode(document.data).decode('utf-8') - else: - content = self.serviceManagement.getFileData(document.fileId) - if content is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") - content = content.decode('utf-8') - + content = (await self._getFileData(document)).decode('utf-8') return [ContentItem( label="main", data=content, @@ -260,17 +242,10 @@ class DocumentProcessor: logger.error(f"Error processing CSV document: {str(e)}") raise FileProcessingError(f"Failed to process CSV document: {str(e)}") - async def _processJson(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processJson(self, document: ChatDocument) -> List[ContentItem]: """Process JSON document""" try: - if isinstance(document, TaskDocument): - content = base64.b64decode(document.data).decode('utf-8') - else: - content = self.serviceManagement.getFileData(document.fileId) - if content is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") - content = content.decode('utf-8') - + content = (await self._getFileData(document)).decode('utf-8') # Parse JSON to validate jsonData = json.loads(content) @@ -288,17 +263,10 @@ class DocumentProcessor: logger.error(f"Error processing JSON document: {str(e)}") raise FileProcessingError(f"Failed to process JSON document: {str(e)}") - async def _processXml(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processXml(self, document: ChatDocument) -> List[ContentItem]: """Process XML document""" try: - if isinstance(document, TaskDocument): - content = base64.b64decode(document.data).decode('utf-8') - else: - content = self.serviceManagement.getFileData(document.fileId) - if content is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") - content = content.decode('utf-8') - + content = (await self._getFileData(document)).decode('utf-8') return [ContentItem( label="main", data=content, @@ -313,17 +281,10 @@ class DocumentProcessor: logger.error(f"Error processing XML document: {str(e)}") raise FileProcessingError(f"Failed to process XML document: {str(e)}") - async def _processHtml(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processHtml(self, document: ChatDocument) -> List[ContentItem]: """Process HTML document""" try: - if isinstance(document, TaskDocument): - content = base64.b64decode(document.data).decode('utf-8') - else: - content = self.serviceManagement.getFileData(document.fileId) - if content is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") - content = content.decode('utf-8') - + content = (await self._getFileData(document)).decode('utf-8') return [ContentItem( label="main", data=content, @@ -338,17 +299,10 @@ class DocumentProcessor: logger.error(f"Error processing HTML document: {str(e)}") raise FileProcessingError(f"Failed to process HTML document: {str(e)}") - async def _processSvg(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processSvg(self, document: ChatDocument) -> List[ContentItem]: """Process SVG document""" try: - if isinstance(document, TaskDocument): - content = base64.b64decode(document.data).decode('utf-8') - else: - content = self.serviceManagement.getFileData(document.fileId) - if content is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") - content = content.decode('utf-8') - + content = (await self._getFileData(document)).decode('utf-8') # Check if it's actually SVG isSvg = " List[ContentItem]: + async def _processImage(self, document: ChatDocument) -> List[ContentItem]: """Process image document""" try: self._loadImageProcessor() if not imageProcessorLoaded: raise FileProcessingError("Image processing libraries not available") - if isinstance(document, TaskDocument): - fileData = base64.b64decode(document.data) - else: - fileData = self.serviceManagement.getFileData(document.fileId) - if fileData is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") + fileData = await self._getFileData(document) with io.BytesIO(fileData) as imgStream: img = Image.open(imgStream) @@ -404,19 +353,14 @@ class DocumentProcessor: logger.error(f"Error processing image document: {str(e)}") raise FileProcessingError(f"Failed to process image document: {str(e)}") - async def _processPdf(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processPdf(self, document: ChatDocument) -> List[ContentItem]: """Process PDF document""" try: self._loadPdfExtractor() if not pdfExtractorLoaded: raise FileProcessingError("PDF extraction libraries not available") - if isinstance(document, TaskDocument): - fileData = base64.b64decode(document.data) - else: - fileData = self.serviceManagement.getFileData(document.fileId) - if fileData is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") + fileData = await self._getFileData(document) contentItems = [] @@ -480,19 +424,14 @@ class DocumentProcessor: logger.error(f"Error processing PDF document: {str(e)}") raise FileProcessingError(f"Failed to process PDF document: {str(e)}") - async def _processDocx(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processDocx(self, document: ChatDocument) -> List[ContentItem]: """Process Word document""" try: self._loadOfficeExtractor() if not officeExtractorLoaded: raise FileProcessingError("Office extraction libraries not available") - if isinstance(document, TaskDocument): - fileData = base64.b64decode(document.data) - else: - fileData = self.serviceManagement.getFileData(document.fileId) - if fileData is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") + fileData = await self._getFileData(document) with io.BytesIO(fileData) as docxStream: doc = docx.Document(docxStream) @@ -526,19 +465,14 @@ class DocumentProcessor: logger.error(f"Error processing Word document: {str(e)}") raise FileProcessingError(f"Failed to process Word document: {str(e)}") - async def _processXlsx(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processXlsx(self, document: ChatDocument) -> List[ContentItem]: """Process Excel document""" try: self._loadOfficeExtractor() if not officeExtractorLoaded: raise FileProcessingError("Office extraction libraries not available") - if isinstance(document, TaskDocument): - fileData = base64.b64decode(document.data) - else: - fileData = self.serviceManagement.getFileData(document.fileId) - if fileData is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") + fileData = await self._getFileData(document) contentItems = [] @@ -575,15 +509,10 @@ class DocumentProcessor: logger.error(f"Error processing Excel document: {str(e)}") raise FileProcessingError(f"Failed to process Excel document: {str(e)}") - async def _processBinary(self, document: Union[ChatDocument, TaskDocument]) -> List[ContentItem]: + async def _processBinary(self, document: ChatDocument) -> List[ContentItem]: """Process binary document""" try: - if isinstance(document, TaskDocument): - fileData = base64.b64decode(document.data) - else: - fileData = self.serviceManagement.getFileData(document.fileId) - if fileData is None: - raise FileProcessingError(f"Could not get file data for {document.fileId}") + fileData = await self._getFileData(document) return [ContentItem( label="binary", diff --git a/modules/workflow/serviceContainer.py b/modules/workflow/serviceContainer.py index 5b5f0a2f..263f4ad6 100644 --- a/modules/workflow/serviceContainer.py +++ b/modules/workflow/serviceContainer.py @@ -1,360 +1,224 @@ import logging +import importlib +import pkgutil +import inspect from typing import Dict, Any, List, Optional -from datetime import datetime, UTC -import json -import asyncio - -from modules.shared.configuration import APP_CONFIG -from modules.methods import MethodBase, MethodResult +from modules.interfaces.serviceAppClass import User from modules.interfaces.serviceChatModel import ( - TaskStatus, UserInputRequest, ContentMetadata, ContentItem, - ChatDocument, TaskDocument, ExtractedContent, TaskItem, - TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow + TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, + ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserConnection ) -from modules.interfaces.serviceManagementClass import ServiceManagement -from modules.interfaces.serviceChatClass import ChatInterface +from modules.interfaces.interfaceAi import interfaceAi +from modules.interfaces.serviceChatClass import getInterface as getChatInterface +from modules.interfaces.serviceManagementClass import getInterface as getFileInterface +from modules.workflow.managerDocument import DocumentManager +from modules.methods.methodBase import MethodBase logger = logging.getLogger(__name__) class ServiceContainer: - """Service container for dependency injection and service management.""" + """Service container that provides access to all services and their functions""" - def __init__(self, chatInterface: ChatInterface): - self.methods = {} - self.context = {} - self.workflow = None - self.model = {} - self.functions = {} - self.documentProcessor = None - self.state = { - 'status': TaskStatus.PENDING, - 'retryCount': 0, - 'retryMax': 3, - 'timeout': 300, # 5 minutes - 'lastError': None, - 'lastErrorTime': None - } - self.tasks: Dict[str, TaskItem] = {} # Will be populated with TaskItem instances + def __init__(self, currentUser: User, workflow: ChatWorkflow): + # Core services + self.user = currentUser + self.workflow = workflow + self.tasks = workflow.tasks + self.statusEnums = TaskStatus - # Initialize service management - self.serviceManagement = ServiceManagement() + # Initialize managers + self.interfaceChat = getChatInterface(currentUser) + self.interfaceFiles = getFileInterface(currentUser) + self.interfaceAi = interfaceAi() + self.documentManager = DocumentManager(self) - # Initialize file-related functions - self.functions = { - 'getFileData': self.serviceManagement.getFileData, - 'saveFileData': self.serviceManagement.saveFileData, - 'getFileMetadata': self.serviceManagement.getFileMetadata, - 'saveFileMetadata': self.serviceManagement.saveFileMetadata, - 'deleteFile': self.serviceManagement.deleteFile, - 'getFile': self.serviceManagement.getFile, - 'getMimeType': self.serviceManagement.getMimeType, - 'calculateFileHash': self.serviceManagement.calculateFileHash, - 'checkForDuplicateFile': self.serviceManagement.checkForDuplicateFile - } + # Initialize methods catalog + self.methods = None + # Discover additional methods + self._discoverMethods() - def initialize(self) -> None: - """Initialize service container""" - pass - - def registerMethod(self, methodName: str, methodInstance: Any) -> None: - """Register a new method""" - self.methods[methodName] = methodInstance - - def getMethod(self, methodName: str) -> Optional[Any]: - """Get a method by name""" - return self.methods.get(methodName) - - def removeMethod(self, methodName: str) -> None: - """Remove a method""" - self.methods.pop(methodName, None) - - def hasMethod(self, methodName: str) -> bool: - """Check if a method exists""" - return methodName in self.methods - - def listMethods(self) -> List[str]: - """List all registered methods""" - return list(self.methods.keys()) - - def getMethodInfo(self, methodName: str) -> Dict[str, Any]: - """Get method information""" - method = self.getMethod(methodName) - if not method: - return {} - return { - "name": methodName, - "description": self.getMethodDescription(methodName), - "version": self.getMethodVersion(methodName), - "author": self.getMethodAuthor(methodName), - "license": self.getMethodLicense(methodName), - "dependencies": self.getMethodDependencies(methodName), - "tags": self.getMethodTags(methodName), - "examples": self.getMethodExamples(methodName), - "documentation": self.getMethodDocumentation(methodName), - "source": self.getMethodSource(methodName), - "tests": self.getMethodTests(methodName), - "benchmarks": self.getMethodBenchmarks(methodName), - "metrics": self.getMethodMetrics(methodName), - "logs": self.getMethodLogs(methodName), - "history": self.getMethodHistory(methodName), - "usage": self.getMethodUsage(methodName), - "errors": self.getMethodErrors(methodName), - "warnings": self.getMethodWarnings(methodName) - } - - def getMethodSchema(self, methodName: str) -> Optional[Dict[str, Any]]: - """Get method schema""" - method = self.getMethod(methodName) - return method.schema if method else None - - def getMethodParameters(self, methodName: str) -> Optional[Dict[str, Any]]: - """Get method parameters""" - method = self.getMethod(methodName) - return method.parameters if method else None - - def getMethodReturnType(self, methodName: str) -> Optional[str]: - """Get method return type""" - method = self.getMethod(methodName) - return method.returnType if method else None - - def getMethodDescription(self, methodName: str) -> Optional[str]: - """Get method description""" - method = self.getMethod(methodName) - return method.description if method else None - - def getMethodVersion(self, methodName: str) -> Optional[str]: - """Get method version""" - method = self.getMethod(methodName) - return method.version if method else None - - def getMethodAuthor(self, methodName: str) -> Optional[str]: - """Get method author""" - method = self.getMethod(methodName) - return method.author if method else None - - def getMethodLicense(self, methodName: str) -> Optional[str]: - """Get method license""" - method = self.getMethod(methodName) - return method.license if method else None - - def getMethodDependencies(self, methodName: str) -> Optional[List[str]]: - """Get method dependencies""" - method = self.getMethod(methodName) - return method.dependencies if method else None - - def getMethodTags(self, methodName: str) -> Optional[List[str]]: - """Get method tags""" - method = self.getMethod(methodName) - return method.tags if method else None - - def getMethodExamples(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method examples""" - method = self.getMethod(methodName) - return method.examples if method else None - - def getMethodDocumentation(self, methodName: str) -> Optional[str]: - """Get method documentation""" - method = self.getMethod(methodName) - return method.documentation if method else None - - def getMethodSource(self, methodName: str) -> Optional[str]: - """Get method source""" - method = self.getMethod(methodName) - return method.source if method else None - - def getMethodTests(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method tests""" - method = self.getMethod(methodName) - return method.tests if method else None - - def getMethodBenchmarks(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method benchmarks""" - method = self.getMethod(methodName) - return method.benchmarks if method else None - - def getMethodMetrics(self, methodName: str) -> Optional[Dict[str, Any]]: - """Get method metrics""" - method = self.getMethod(methodName) - return method.metrics if method else None - - def getMethodLogs(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method logs""" - method = self.getMethod(methodName) - return method.logs if method else None - - def getMethodHistory(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method history""" - method = self.getMethod(methodName) - return method.history if method else None - - def getMethodUsage(self, methodName: str) -> Optional[Dict[str, Any]]: - """Get method usage""" - method = self.getMethod(methodName) - return method.usage if method else None - - def getMethodErrors(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method errors""" - method = self.getMethod(methodName) - return method.errors if method else None - - def getMethodWarnings(self, methodName: str) -> Optional[List[Dict[str, Any]]]: - """Get method warnings""" - method = self.getMethod(methodName) - return method.warnings if method else None - - def executeTask(self, task: Any) -> None: - """Execute a task""" + def _discoverMethods(self): + """Dynamically discover all method classes in modules.methods package""" try: - # Execute each action - for action in task.actionList: - method = self.getMethod(action.method) - if method: - method.executeAction(action.action, action.parameters) - - except Exception as e: - logger.error(f"Error executing task: {str(e)}") - raise + # Import the methods package + methodsPackage = importlib.import_module('modules.methods') + # Discover all modules in the package + for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): + if not isPkg and name.startswith('method'): + try: + # Import the module + module = importlib.import_module(f'modules.methods.{name}') + + # Find all classes in the module that inherit from MethodBase + for itemName, item in inspect.getmembers(module): + if (inspect.isclass(item) and + issubclass(item, MethodBase) and + item != MethodBase): + # Instantiate the method and add to service + methodInstance = item() + self.methods[methodInstance.name] = methodInstance + logger.info(f"Discovered method: {methodInstance.name}") + + except Exception as e: + logger.error(f"Error loading method module {name}: {str(e)}") + + except Exception as e: + logger.error(f"Error discovering methods: {str(e)}") + + # ===== Functions ===== + + def extractContent(self, prompt: str, document: ChatDocument) -> str: + """Extract content from document using prompt""" + return self.documentManager.extractContent(prompt, document) + + def getMethodsCatalog(self) -> Dict[str, Any]: + """Get catalog of available methods""" + return self.methods + + def getMethodsList(self) -> List[str]: + """Get list of available methods""" + return list(self.methods.keys()) + + def getDocumentReferenceList(self) -> Dict[str, List[Dict[str, str]]]: + """Get list of document references sorted by datetime""" + return { + "chat": self._getChatDocumentReferences(), + "history": self._getHistoryDocumentReferences() + } + + def getDocumentReferenceFromChatDocument(self, document: ChatDocument) -> str: + """Get document reference from ChatDocument""" + return f"document_{document.id}_{document.filename}" + + def getDocumentReferenceFromTaskResult(self, result: TaskResult) -> str: + """Get document reference from TaskResult""" + return f"documentList_{result.id}_{result.documentsLabel}" + + def getChatDocumentsFromDocumentReference(self, documentReference: str) -> List[ChatDocument]: + """Get ChatDocuments from document reference""" + return self.documentManager.getDocumentsByReference(documentReference) + + def getConnectionReferenceList(self) -> List[Dict[str, str]]: + """Get list of connection references sorted by authority""" + return self._getConnectionReferences() + + def getConnectionReferenceFromUserConnection(self, connection: UserConnection) -> str: + """Get connection reference from UserConnection""" + return f"connection_{connection.id}_{connection.authority}" + + def getUserConnectionFromConnectionReference(self, reference: str) -> UserConnection: + """Get UserConnection from connection reference""" + return self._getUserConnectionByReference(reference) + + def getMessageSummary(self, message: ChatMessage) -> Dict[str, List[Dict[str, Any]]]: + """Get message summary""" + return { + "chat": self._getChatMessageSummaries(), + "history": self._getHistoryMessageSummaries() + } + def getFileData(self, fileId: str) -> bytes: """Get file data by ID""" + return self.interfaceFiles.getFileData(fileId) + + def callAiBasic(self, prompt: str, context: str = None, complexityFlag: bool = False) -> str: + """Call basic AI service""" + return self.interfaceAi.callAiBasic(prompt, context, complexityFlag) + + def callAiImage(self, imageData: bytes, mimeType: str, prompt: str) -> str: + """Call AI image service""" + return self.interfaceAi.callAiImage(imageData, mimeType, prompt) + + def createFile(self, fileName: str, mimeType: str, content: bytes, base64encoded: bool = False) -> Dict[str, Any]: + """Create new file""" + return self.interfaceFiles.createFile(fileName, mimeType, content, base64encoded) + + def getFileInfo(self, fileId: str) -> Dict[str, Any]: + """Get file information""" + return self.interfaceFiles.getFileInfo(fileId) + + # ===== Private Methods ===== + + def _executeMethodAction(self, parameters: Dict[str, Any]) -> Any: + """Execute method action with parameters""" + method = parameters.get('method') + action = parameters.get('action') + if method in self.methods and action in self.methods[method]: + return self.methods[method][action](**parameters.get('parameters', {})) + raise ValueError(f"Unknown method or action: {method}.{action}") + + def _executeForEach(self, items: List[Any], action: callable) -> List[Any]: + """Execute forEach operation""" + results = [] + for item in items: + try: + result = action(item) + results.append(result) + except Exception as e: + logger.error(f"Error executing forEach action: {str(e)}") + results.append(None) + return results + + def _executeAiCall(self, prompt: str, documents: List[Dict[str, Any]]) -> List[Any]: + """Execute AI call with documents""" try: - # Get file data from storage - if hasattr(self.functions, 'getFileData'): - return self.functions.getFileData(fileId) - return b"" - + # Process each document + results = [] + for doc in documents: + content = self.extractContent(prompt, doc) + results.append(content) + return results except Exception as e: - logger.error(f"Error getting file data: {str(e)}") - return b"" - - def saveFileData(self, fileId: str, data: bytes) -> bool: - """Save file data by ID""" - try: - # Save file data to storage - if hasattr(self.functions, 'saveFileData'): - return self.functions.saveFileData(fileId, data) - return False - - except Exception as e: - logger.error(f"Error saving file data: {str(e)}") - return False - - def getFileMetadata(self, fileId: str) -> Dict[str, Any]: - """Get file metadata by ID""" - try: - # Get file metadata from storage - if hasattr(self.functions, 'getFileMetadata'): - return self.functions.getFileMetadata(fileId) - return {} - - except Exception as e: - logger.error(f"Error getting file metadata: {str(e)}") - return {} - - def saveFileMetadata(self, fileId: str, metadata: Dict[str, Any]) -> bool: - """Save file metadata by ID""" - try: - # Save file metadata to storage - if hasattr(self.functions, 'saveFileMetadata'): - return self.functions.saveFileMetadata(fileId, metadata) - return False - - except Exception as e: - logger.error(f"Error saving file metadata: {str(e)}") - return False - - async def executeTaskImproved(self, task: Any) -> None: # task: AgentTask - """Execute task with improved error handling and timeout""" - try: - # Check for timeout - if (datetime.now(UTC) - datetime.fromisoformat(task.startedAt)).seconds > self.state['timeout']: - task.status = TaskStatus.TIMEOUT - return - - # Execute actions - for action in task.actionList: - if not task.canExecuteAction(action): - if not task.getAuthData(action.authSource): - action.status = ActionStatus.FAILED - task.error = f"Missing authentication for {action.authSource}" - else: - action.status = ActionStatus.DEPENDENCY_FAILED - continue - - try: - # Get method - method = self.getMethod(action.method) - if not method: - raise ValueError(f"Unknown method: {action.method}") - - # Validate parameters - if not await method.validateParameters(action.action, action.parameters): - raise ValueError(f"Invalid parameters for {action.method}:{action.action}") - - # Get auth data if needed - authData = None - if action.authSource and action.authSource != "local": - authData = task.getAuthData(action.authSource) - if not authData: - raise ValueError(f"Missing authentication data for {action.authSource}") - - # Execute with timeout - result = await asyncio.wait_for( - method.execute(action.action, action.parameters, authData), - timeout=action.timeout or 60 - ) - - if result.success: - action.status = ActionStatus.SUCCESS - else: - if self._shouldRetry(result.data.get('error')): - action.retryCount += 1 - if action.retryCount > action.retryMax: - action.status = ActionStatus.FAILED - if action.rollbackOnFailure: - await method.rollback(action.action, action.parameters, authData) - else: - action.status = ActionStatus.RETRY - else: - action.status = ActionStatus.FAILED - if action.rollbackOnFailure: - await method.rollback(action.action, action.parameters, authData) - - except asyncio.TimeoutError: - action.status = ActionStatus.TIMEOUT - except Exception as e: - action.status = ActionStatus.FAILED - if action.rollbackOnFailure: - await method.rollback(action.action, action.parameters, authData) - - # Update task status - if task.hasFailed(): - task.status = TaskStatus.FAILED - elif task.isComplete(): - task.status = TaskStatus.SUCCESS - task.finishedAt = datetime.now(UTC).isoformat() - - except Exception as e: - task.status = TaskStatus.FAILED - task.error = str(e) - - def _shouldRetry(self, error: str) -> bool: - """Determine if error is retryable""" - retryableErrors = [ - "AI down", - "Document not found", - "Content extraction failed", - "Network error", - "Temporary failure" - ] - return any(err in error for err in retryableErrors) - - def getAvailableMethodsCatalog(self) -> Dict[str, Dict[str, Any]]: - """Get catalog of available methods and their actions""" - return { - name: { - "description": method.description, - "actions": method.actions - } - for name, method in self.methods.items() - } \ No newline at end of file + logger.error(f"Error executing AI call: {str(e)}") + return [] + + def _executeSharePointQuery(self, connection: str, site_query: str, file_query: str, content_query: str) -> List[Dict[str, str]]: + """Execute SharePoint query""" + # TODO: Implement SharePoint query + return [] + + def _executeSharePointDownload(self, connection: str, filepath: str) -> str: + """Execute SharePoint download""" + # TODO: Implement SharePoint download + return "" + + def _getChatDocumentReferences(self) -> List[Dict[str, str]]: + """Get chat document references""" + # TODO: Implement chat document references + return [] + + def _getHistoryDocumentReferences(self) -> List[Dict[str, str]]: + """Get history document references""" + # TODO: Implement history document references + return [] + + def _getConnectionReferences(self) -> List[Dict[str, str]]: + """Get connection references""" + # TODO: Implement connection references + return [] + + def _getUserConnectionByReference(self, reference: str) -> UserConnection: + """Get user connection by reference""" + # TODO: Implement user connection lookup + pass + + def _getChatMessageSummaries(self) -> List[Dict[str, Any]]: + """Get chat message summaries""" + # TODO: Implement chat message summaries + return [] + + def _getHistoryMessageSummaries(self) -> List[Dict[str, Any]]: + """Get history message summaries""" + # TODO: Implement history message summaries + return [] + +# Create singleton instance +serviceObject = None + +def initializeServiceContainer(currentUser: User, workflow: ChatWorkflow) -> ServiceContainer: + """Initialize the service container singleton""" + global serviceObject + if serviceObject is None: + serviceObject = ServiceContainer(currentUser, workflow) + return serviceObject diff --git a/notes/changelog.txt b/notes/changelog.txt index 057f9b92..21856505 100644 --- a/notes/changelog.txt +++ b/notes/changelog.txt @@ -1,6 +1,83 @@ -- all AI calls to route over AI-Module (AI basic, ai special, ai...) and to streamline -- service object to define correctly +- all AI calls to route over self service (AI basic, ai special, ai...) and to streamline +- integrate, then remove: + callAi(prompt, context) -> str + callAi4Image(prompt, context, imageData)-> str + _callAiBasic + _callAiAdvanced + +in manageChat +- not to manipulate workflow or messages --> to use functions from servicechatclass +- to add CRUD for tasks to workflow +- to move methods to methods class + +- service object to define correctly +- folder workflow modules --> solve problems types + + +Procedure for task generation: + +- prompt input: + - original user input (summary) + - prompt for task to do + - method list + - workflow's history (list of messageSummary) + - available documents (list of documentReference): + - available connections (list of connectionReference): + - instructions: + - rules for result: status (enum), feedback (to tell what is done and what needed next. Only to to the tasks, which are possible with the available methods, referencing and data, rest to do in next round) + - available data to use: list of documentReference, list of connectionReference + - methods usage: + - syntax: method.action([parameter:type])->resultLabel:type + - sequence of method.action to be the sequence ot processing + - as parameter only to use available items of documentReference and connectionReference or resultLabel of a previous method.action + - required result format: json for TaskResult, nothing else + +- result parsing: + - execute task actions by using data references stepwise, all documents in TaskResults to save as ChatDocuments, to be available for next action. + + +TODO: +- action output filename to be available as TaskResult with name to reference object and ChatDocument list + + +service (ServiceContainer) + user <-- currentUser + workflow <- workflow + tasks <- workflow.tasks + statusEnums <- serviceChatModel.TaskStatus + chatDatabase.* <-- serviceChatClass(user) + + methods + method.action([parameter:type])->resultLabel:type + + operator.forEach([items:List[item], action:method.action])->resultList:List[] + operator.aiCall([prompt:str, extractedDocumentContent:List[{"document":documentReference,"promptForContentExtraction":str}]])->resultList:List[] + + sharepoint.query([connectionReference:str, site_query:str, file_query:str, content_query:str])->resultList:List[{"filepath":str,"extractedContent":str}] + sharepoint.download([connectionReference:str, filepath:str])->documentReference:str + + functions + extractContent(prompt,documentReference):str <- managerDocument.extractContent(prompt,FilePreview) + getMethodsCatalog():{...} <- to import dynamically all methods from the files "method*.py" in folder modules/methods + getMethodsList():[str] <- to transform result from getMethodsCatalog into a list with the method items in format "method.action([parameter:type])->resultLabel:type" + + getDocumentReferenceList():{"chat":[{"documentReference":str,"datetime":str}],"history":[{"documentReference":str,"datetime":str}]} sorted by datetime desc + getDocumentReferenceFromChatDocument(ChatDocument):"document_"+ChatDocument.id+"_"+ChatDocument.filename + getDocumentReferenceFromTaskResult(TaskResult):"documentList_"+TaskResult.id+"_"+TaskResult.documentsLabel + getChatDocumentsFromDocumentReference(documentReference):List[ChatDocuments] + + getConnectionReferenceList():List[{"connectionReference":str,"authority":str}] sorted by authority + getConnectionReferenceFromUserConnection(UserConnection):"connection_"+UserConnection.id+"_"+UserConnection.authority + getUserConnectionFromConnectionReference(documentReference):UserConnection + + getMessageSummary(ChatMessage):{"chat":[{"messageSummary":str,"role":str,"success":bool,"sequenceNr":int}],"history":[{"messageSummary":str,"role":str,"success":bool,"sequenceNr":int}]} sorted by datetime desc + + getFileData(fileId) <- seriveManagementClass.getFileData(fileid) --> used by ManagerData + callAiBasic(prompt, context, complexityFlag) <- interfaceAi.callAiBasic --> used by managerChat + callAiImage(...) <- interfaceAi.callAiImage --> used by processorDocument + createFile(fileName, mimeType, content, base64encoded):FileItem <- seriveManagementClass.createFile, then seriveManagementClass.createFileData --> used by managerChat + getFileInfo(id):FileItem <- serviceManagementClass.getFile(id) --> used by managerChat diff --git a/test/pytest.ini b/test/pytest.ini deleted file mode 100644 index d9f16d89..00000000 --- a/test/pytest.ini +++ /dev/null @@ -1,3 +0,0 @@ -[pytest] -asyncio_mode = strict -asyncio_default_fixture_loop_scope = function \ No newline at end of file diff --git a/test/test_chat.py b/test/test_chat.py deleted file mode 100644 index ab30a5cc..00000000 --- a/test/test_chat.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -Test module for chat workflow functionality. -Tests the workflow process with analysis tasks. -""" - -import sys -import os -from pathlib import Path -from datetime import datetime, UTC -from unittest.mock import patch - -# Add gateway directory to Python path -gateway_dir = Path(__file__).parent.parent -sys.path.append(str(gateway_dir)) -sys.path.append(str(gateway_dir.parent)) - -from modules.workflow.workflowManager import WorkflowManager -from modules.interfaces.serviceChatModel import ChatWorkflow, UserInputRequest, ChatStat -from modules.workflow.chatManager import getChatManager -from modules.interfaces.serviceManagementModel import FileItem - -def test_workflow_process(): - # Initialize workflow manager - workflowManager = WorkflowManager() - - # Create test workflow - workflow = ChatWorkflow( - id="test-workflow", - mandateId="test-mandate", - status="running", - currentRound=1, - lastActivity=datetime.now(UTC).isoformat(), - startedAt=datetime.now(UTC).isoformat(), - messages=[], - stats=ChatStat(), - tasks=[] - ) - - # Initialize chat manager with workflow - chatManager = getChatManager() - chatManager.initialize(workflow) - - # Create mock file - mock_file = FileItem( - id="550e8400-e29b-41d4-a716-446655440000", - mandateId="test-mandate", - filename="test_file.txt", - mimeType="text/plain", - fileHash="test_hash", - fileSize=1024, - creationDate=datetime.now(UTC).isoformat() - ) - - # Mock the getFile function - with patch.object(chatManager.serviceManagement, 'getFile', return_value=mock_file): - # Create test user input - userInput = UserInputRequest( - prompt="Test prompt", - listFileId=["550e8400-e29b-41d4-a716-446655440000"] # UUID string - ) - - # Process workflow - result = workflowManager.workflowProcess(userInput, workflow) - - # Verify workflow completed successfully - assert result.status in ["completed", "failed"], f"Unexpected workflow status: {result.status}" - assert len(result.messages) > 0, "No messages were generated" - assert result.messages[-1].role == "assistant", "Last message should be from assistant" - -if __name__ == "__main__": - test_workflow_process() \ No newline at end of file