From 739f22785c90137784749975e7aaa0562556cf60 Mon Sep 17 00:00:00 2001 From: ValueOn AG
There was an error generating the analysis: {str(e)}
" - else: - content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" - - return self.formatAgentDocumentOutput(outputLabel, content, contentType) - - def _getImageBase64(self, formatType: str = 'png') -> str: - """ - Convert current matplotlib figure to base64 string. - - Args: - formatType: Image format - - Returns: - Base64 encoded string of the image - """ - buffer = io.BytesIO() - plt.savefig(buffer, format=formatType, dpi=100) - buffer.seek(0) - imageData = buffer.getvalue() - buffer.close() - - # Convert to base64 - return base64.b64encode(imageData).decode('utf-8') - - async def _analyzeData(self, task: Dict[str, Any], analysisPlan: Dict[str, Any]) -> Dict[str, Any]: - """ - Analyze data based on the analysis plan. - - Args: - task: Task dictionary with input documents and specifications - analysisPlan: Analysis plan from _createAnalysisPlan - - Returns: - Analysis results dictionary - """ - try: - # Extract data from input documents - inputDocuments = task.get("inputDocuments", []) - datasets, documentContext = self._extractData(inputDocuments) - - # Get task information - prompt = task.get("prompt", "") - outputSpecs = task.get("outputSpecifications", []) - - # Analyze task requirements - analysisResults = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs) - - # Add datasets and context to results - analysisResults["datasets"] = datasets - analysisResults["documentContext"] = documentContext - - return analysisResults - - except Exception as e: - logger.error(f"Error analyzing data: {str(e)}", exc_info=True) - return { - "error": str(e), - "datasets": {}, - "documentContext": "" - } - - async def _createOutputDocuments(self, prompt: str, analysisResults: Dict[str, Any], - outputSpecs: List[Dict[str, Any]], analysisPlan: Dict[str, Any]) -> List[Dict[str, Any]]: - """ - Create output documents based on analysis results. - - Args: - prompt: Original task prompt - analysisResults: Results from data analysis - outputSpecs: List of output specifications - analysisPlan: Analysis plan from _createAnalysisPlan - - Returns: - List of document objects - """ - documents = [] - datasets = analysisResults.get("datasets", {}) - documentContext = analysisResults.get("documentContext", "") - - # Process each output specification - for spec in outputSpecs: - outputLabel = spec.get("label", "") - outputDescription = spec.get("description", "") - - # Determine format from filename - formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt" - - try: - # Create appropriate document based on format - if formatType in ["png", "jpg", "jpeg", "svg"]: - # Visualization output - document = await self._createVisualization( - datasets, prompt, outputLabel, analysisPlan, outputDescription - ) - elif formatType in ["csv", "json", "xlsx"]: - # Data document output - document = await self._createDataDocument( - datasets, prompt, outputLabel, analysisPlan, outputDescription - ) - else: - # Text document output (markdown, html, text) - document = await self._createTextDocument( - datasets, documentContext, prompt, outputLabel, formatType, - analysisPlan, outputDescription - ) - - documents.append(document) - - except Exception as e: - logger.error(f"Error creating output document {outputLabel}: {str(e)}", exc_info=True) - # Create error document - errorDoc = self.formatAgentDocumentOutput( - outputLabel, - f"Error creating document: {str(e)}", - "text/plain" - ) - documents.append(errorDoc) - - return documents - - -# Factory function for the Analyst agent -def getAgentAnalyst(): - """Returns an instance of the Analyst agent.""" - return AgentAnalyst() \ No newline at end of file diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py index db01cd62..a64bb7cd 100644 --- a/modules/interfaces/serviceChatModel.py +++ b/modules/interfaces/serviceChatModel.py @@ -3,12 +3,31 @@ Chat model classes for the chat system. """ from pydantic import BaseModel, Field -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Union from datetime import datetime import uuid from modules.shared.attributeUtils import register_model_labels, ModelMixin + +# USER MODELS + +class UserInputRequest(BaseModel, ModelMixin): + """Data model for a user input request""" + prompt: str = Field(description="Prompt for the user") + listFileId: List[str] = Field(default_factory=list, description="List of file IDs") + userLanguage: str = Field(default="en", description="User's preferred language") +# Register labels for UserInputRequest +register_model_labels( + "UserInputRequest", + {"en": "User Input Request", "fr": "Demande de saisie utilisateur"}, + { + "prompt": {"en": "Prompt", "fr": "Invite"}, + "listFileId": {"en": "File IDs", "fr": "IDs des fichiers"}, + "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"} + } +) + # WORKFLOW MODELS class ChatContent(BaseModel, ModelMixin): @@ -18,7 +37,6 @@ class ChatContent(BaseModel, ModelMixin): data: str = Field(description="The actual content data") mimeType: str = Field(description="MIME type of the content") metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") - # Register labels for ChatContent register_model_labels( "ChatContent", @@ -40,7 +58,6 @@ class ChatDocument(BaseModel, ModelMixin): fileSize: int = Field(description="Size of the file") mimeType: str = Field(description="MIME type of the file") contents: List[ChatContent] = Field(default_factory=list, description="List of chat contents") - # Register labels for ChatDocument register_model_labels( "ChatDocument", @@ -64,7 +81,6 @@ class ChatStat(BaseModel, ModelMixin): bytesReceived: Optional[int] = Field(None, description="Number of bytes received") successRate: Optional[float] = Field(None, description="Success rate of operations") errorCount: Optional[int] = Field(None, description="Number of errors encountered") - # Register labels for ChatStat register_model_labels( "ChatStat", @@ -91,7 +107,6 @@ class ChatLog(BaseModel, ModelMixin): status: str = Field(description="Status of the log entry") progress: Optional[int] = Field(None, description="Progress percentage") performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics") - # Register labels for ChatLog register_model_labels( "ChatLog", @@ -124,7 +139,6 @@ class ChatMessage(BaseModel, ModelMixin): finishedAt: Optional[str] = Field(None, description="When the message processing finished") stats: Optional[ChatStat] = Field(None, description="Statistics for this message") success: Optional[bool] = Field(None, description="Whether the message processing was successful") - # Register labels for ChatMessage register_model_labels( "ChatMessage", @@ -146,7 +160,7 @@ register_model_labels( } ) -class Task(BaseModel, ModelMixin): +class AgentTask(BaseModel, ModelMixin): """Data model for a task""" id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") workflowId: str = Field(description="Foreign key to workflow") @@ -162,10 +176,9 @@ class Task(BaseModel, ModelMixin): startedAt: str = Field(description="When the task started") finishedAt: Optional[str] = Field(None, description="When the task finished") performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics") - -# Register labels for Task +# Register labels for AgentTask register_model_labels( - "Task", + "AgentTask", {"en": "Task", "fr": "Tâche"}, { "id": {"en": "ID", "fr": "ID"}, @@ -185,6 +198,28 @@ register_model_labels( } ) +class Agent(BaseModel, ModelMixin): + """Data model for an agent""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") + name: str = Field(description="Name of the agent") + description: str = Field(description="Description of the agent") + capabilities: List[str] = Field(default_factory=list, description="List of agent capabilities") + performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics") +# Register labels for Agent +register_model_labels( + "Agent", + {"en": "Agent", "fr": "Agent"}, + { + "id": {"en": "ID", "fr": "ID"}, + "name": {"en": "Name", "fr": "Nom"}, + "description": {"en": "Description", "fr": "Description"}, + "capabilities": {"en": "Capabilities", "fr": "Capacités"}, + "performance": {"en": "Performance", "fr": "Performance"} + } +) + +# WORKFLOW MODELS + class ChatWorkflow(BaseModel, ModelMixin): """Data model for a chat workflow""" id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") @@ -197,8 +232,7 @@ class ChatWorkflow(BaseModel, ModelMixin): logs: List[ChatLog] = Field(default_factory=list, description="Workflow logs") messages: List[ChatMessage] = Field(default_factory=list, description="Messages in the workflow") stats: Optional[ChatStat] = Field(None, description="Workflow statistics") - tasks: List[Task] = Field(default_factory=list, description="List of tasks in the workflow") - + tasks: List[AgentTask] = Field(default_factory=list, description="List of tasks in the workflow") # Register labels for ChatWorkflow register_model_labels( "ChatWorkflow", @@ -218,151 +252,124 @@ register_model_labels( } ) -# AGENT AND TASK MODELS +# DOCUMENT MODELS -class Agent(BaseModel, ModelMixin): - """Data model for an agent""" - id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key") - name: str = Field(description="Name of the agent") - description: str = Field(description="Description of the agent") - capabilities: List[str] = Field(default_factory=list, description="List of agent capabilities") - performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics") +class DocumentExtraction(BaseModel, ModelMixin): + """Data model for document extraction history""" + timestamp: str = Field(description="Timestamp of extraction") + type: str = Field(description="Type of document") + sections: List[str] = Field(default_factory=list, description="Extracted sections") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Extraction metadata") -# Register labels for Agent +# Register labels for DocumentExtraction register_model_labels( - "Agent", - {"en": "Agent", "fr": "Agent"}, + "DocumentExtraction", + {"en": "Document Extraction", "fr": "Extraction de document"}, + { + "timestamp": {"en": "Timestamp", "fr": "Horodatage"}, + "type": {"en": "Type", "fr": "Type"}, + "sections": {"en": "Sections", "fr": "Sections"}, + "metadata": {"en": "Metadata", "fr": "Métadonnées"} + } +) + +class DocumentContext(BaseModel, ModelMixin): + """Data model for document context""" + id: str = Field(description="Document ID") + extractionHistory: List[DocumentExtraction] = Field(default_factory=list, description="History of extractions") + relevantSections: List[str] = Field(default_factory=list, description="Relevant sections") + processingStatus: Dict[str, str] = Field(default_factory=dict, description="Processing status") + +# Register labels for DocumentContext +register_model_labels( + "DocumentContext", + {"en": "Document Context", "fr": "Contexte de document"}, + { + "id": {"en": "ID", "fr": "ID"}, + "extractionHistory": {"en": "Extraction History", "fr": "Historique d'extraction"}, + "relevantSections": {"en": "Relevant Sections", "fr": "Sections pertinentes"}, + "processingStatus": {"en": "Processing Status", "fr": "Statut de traitement"} + } +) + +class DocumentMetadata(BaseModel, ModelMixin): + """Data model for document metadata""" + type: str = Field(description="Document type") + format: str = Field(description="Document format") + size: int = Field(description="Document size in bytes") + pages: Optional[int] = Field(None, description="Number of pages") + sections: Optional[List[str]] = Field(None, description="Document sections") + error: Optional[str] = Field(None, description="Processing error if any") + +# Register labels for DocumentMetadata +register_model_labels( + "DocumentMetadata", + {"en": "Document Metadata", "fr": "Métadonnées de document"}, + { + "type": {"en": "Type", "fr": "Type"}, + "format": {"en": "Format", "fr": "Format"}, + "size": {"en": "Size", "fr": "Taille"}, + "pages": {"en": "Pages", "fr": "Pages"}, + "sections": {"en": "Sections", "fr": "Sections"}, + "error": {"en": "Error", "fr": "Erreur"} + } +) + +class ImageData(BaseModel, ModelMixin): + """Data model for image data""" + data: str = Field(description="Base64 encoded image data") + format: str = Field(description="Image format") + page: Optional[int] = Field(None, description="Page number if from a multi-page document") + index: Optional[int] = Field(None, description="Image index in the document") + +# Register labels for ImageData +register_model_labels( + "ImageData", + {"en": "Image Data", "fr": "Données d'image"}, + { + "data": {"en": "Image Data", "fr": "Données d'image"}, + "format": {"en": "Format", "fr": "Format"}, + "page": {"en": "Page", "fr": "Page"}, + "index": {"en": "Index", "fr": "Index"} + } +) + +class DocumentContent(BaseModel, ModelMixin): + """Data model for document content""" + text: Optional[str] = Field(None, description="Extracted text content") + data: Optional[Dict[str, Any]] = Field(None, description="Structured data content") + images: Optional[List[ImageData]] = Field(None, description="Extracted images") + metadata: DocumentMetadata = Field(description="Document metadata") + +# Register labels for DocumentContent +register_model_labels( + "DocumentContent", + {"en": "Document Content", "fr": "Contenu de document"}, + { + "text": {"en": "Text", "fr": "Texte"}, + "data": {"en": "Data", "fr": "Données"}, + "images": {"en": "Images", "fr": "Images"}, + "metadata": {"en": "Metadata", "fr": "Métadonnées"} + } +) + +class ProcessedDocument(BaseModel, ModelMixin): + """Data model for processed document""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Document ID") + name: str = Field(description="Document name") + contentType: str = Field(description="Content type") + content: DocumentContent = Field(description="Document content") + context: Optional[DocumentContext] = Field(None, description="Document context") + +# Register labels for ProcessedDocument +register_model_labels( + "ProcessedDocument", + {"en": "Processed Document", "fr": "Document traité"}, { "id": {"en": "ID", "fr": "ID"}, "name": {"en": "Name", "fr": "Nom"}, - "description": {"en": "Description", "fr": "Description"}, - "capabilities": {"en": "Capabilities", "fr": "Capacités"}, - "performance": {"en": "Performance", "fr": "Performance"} + "contentType": {"en": "Content Type", "fr": "Type de contenu"}, + "content": {"en": "Content", "fr": "Contenu"}, + "context": {"en": "Context", "fr": "Contexte"} } ) - -class AgentResponse(BaseModel, ModelMixin): - """Data model for an agent response""" - success: bool = Field(description="Whether the agent execution was successful") - message: ChatMessage = Field(description="Response message from the agent") - performance: Dict[str, Any] = Field(default_factory=dict, description="Performance metrics") - progress: float = Field(description="Task progress (0-100)") - -# Register labels for AgentResponse -register_model_labels( - "AgentResponse", - {"en": "Agent Response", "fr": "Réponse de l'agent"}, - { - "success": {"en": "Success", "fr": "Succès"}, - "message": {"en": "Message", "fr": "Message"}, - "performance": {"en": "Performance", "fr": "Performance"}, - "progress": {"en": "Progress", "fr": "Progression"} - } -) - -class TaskPlan(BaseModel, ModelMixin): - """Data model for a task plan""" - fileList: List[str] = Field(default_factory=list, description="List of files") - tasks: List[Task] = Field(default_factory=list, description="List of tasks in the plan") - userLanguage: str = Field(description="User's preferred language") - userResponse: str = Field(description="User's response or feedback") - -# Register labels for TaskPlan -register_model_labels( - "TaskPlan", - {"en": "Task Plan", "fr": "Plan de tâches"}, - { - "fileList": {"en": "File List", "fr": "Liste de fichiers"}, - "tasks": {"en": "Tasks", "fr": "Tâches"}, - "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"}, - "userResponse": {"en": "User Response", "fr": "Réponse de l'utilisateur"} - } -) - -class UserInputRequest(BaseModel, ModelMixin): - """Data model for a user input request""" - prompt: str = Field(description="Prompt for the user") - listFileId: List[int] = Field(default_factory=list, description="List of file IDs") - userLanguage: str = Field(default="en", description="User's preferred language") - -# Register labels for UserInputRequest -register_model_labels( - "UserInputRequest", - {"en": "User Input Request", "fr": "Demande de saisie utilisateur"}, - { - "prompt": {"en": "Prompt", "fr": "Invite"}, - "listFileId": {"en": "File IDs", "fr": "IDs des fichiers"}, - "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"} - } -) - -class AgentProfile(BaseModel, ModelMixin): - """Model for agent profile information.""" - id: str - name: str - description: str - capabilities: List[str] = Field(default_factory=list) - isAvailable: bool = True - lastActive: Optional[datetime] = None - stats: Optional[Dict[str, Any]] = None - -# Register labels for AgentProfile -register_model_labels( - "AgentProfile", - {"en": "Agent Profile", "fr": "Profil de l'agent"}, - { - "id": {"en": "ID", "fr": "ID"}, - "name": {"en": "Name", "fr": "Nom"}, - "description": {"en": "Description", "fr": "Description"}, - "capabilities": {"en": "Capabilities", "fr": "Capacités"}, - "isAvailable": {"en": "Available", "fr": "Disponible"}, - "lastActive": {"en": "Last Active", "fr": "Dernière activité"}, - "stats": {"en": "Statistics", "fr": "Statistiques"} - } -) - -class AgentHandover(BaseModel, ModelMixin): - """Data model for agent handover information.""" - # Status values - status: str = Field(default="pending", description="One of: pending, success, failed, retry") - error: Optional[str] = Field(None, description="Error message if any") - progress: float = Field(default=0.0, description="Progress percentage") - - # Document information - documentsUserInitial: List[Dict[str, Any]] = Field(default_factory=list, description="Initial user documents") - documentsInput: List[Dict[str, Any]] = Field(default_factory=list, description="Input documents") - documentsOutput: List[Dict[str, Any]] = Field(default_factory=list, description="Output documents") - - # Prompt information - promptUserInitial: str = Field(default="", description="Initial user prompt") - promptFromFinishedAgent: str = Field(default="", description="Prompt from finished agent") - promptForNextAgent: str = Field(default="", description="Prompt for next agent") - - # Agent information - currentAgent: Optional[str] = Field(None, description="Current agent name") - nextAgent: Optional[str] = Field(None, description="Next agent name") - - # Timing information - startedAt: Optional[str] = Field(None, description="Start timestamp") - finishedAt: Optional[str] = Field(None, description="Finish timestamp") - -# Register labels for AgentHandover -register_model_labels( - "AgentHandover", - {"en": "Agent Handover", "fr": "Transfert d'agent"}, - { - "status": {"en": "Status", "fr": "Statut"}, - "error": {"en": "Error", "fr": "Erreur"}, - "progress": {"en": "Progress", "fr": "Progression"}, - "documentsUserInitial": {"en": "Initial User Documents", "fr": "Documents utilisateur initiaux"}, - "documentsInput": {"en": "Input Documents", "fr": "Documents d'entrée"}, - "documentsOutput": {"en": "Output Documents", "fr": "Documents de sortie"}, - "promptUserInitial": {"en": "Initial User Prompt", "fr": "Invite utilisateur initiale"}, - "promptFromFinishedAgent": {"en": "Finished Agent Prompt", "fr": "Invite de l'agent terminé"}, - "promptForNextAgent": {"en": "Next Agent Prompt", "fr": "Invite pour le prochain agent"}, - "currentAgent": {"en": "Current Agent", "fr": "Agent actuel"}, - "nextAgent": {"en": "Next Agent", "fr": "Prochain agent"}, - "startedAt": {"en": "Started At", "fr": "Démarré le"}, - "finishedAt": {"en": "Finished At", "fr": "Terminé le"} - } -) \ No newline at end of file diff --git a/modules/methods/methodBase.py b/modules/methods/methodBase.py new file mode 100644 index 00000000..7618b5a8 --- /dev/null +++ b/modules/methods/methodBase.py @@ -0,0 +1,74 @@ +from enum import Enum +from typing import Dict, List, Optional, Any, Literal +from datetime import datetime, UTC +from pydantic import BaseModel, Field + +class AuthSource(str, Enum): + LOCAL = "local" + MSFT = "msft" + GOOGLE = "google" + # Add more auth sources as needed + +class MethodParameter(BaseModel): + """Model for method parameters""" + name: str + type: str + required: bool + validation: Optional[callable] = None + description: str + +class MethodResult(BaseModel): + """Model for method results""" + success: bool + data: Dict[str, Any] + metadata: Dict[str, Any] = Field(default_factory=dict) + validation: List[str] = Field(default_factory=list) + +class MethodBase: + """Base class for all methods""" + + def __init__(self): + self.name: str + self.description: str + self.auth_source: AuthSource = AuthSource.LOCAL # Default to local auth + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + raise NotImplementedError + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute method action with authentication data""" + raise NotImplementedError + + async def validate_parameters(self, action: str, parameters: Dict[str, Any]) -> bool: + """Validate action parameters""" + if action not in self.actions: + return False + + action_def = self.actions[action] + required_params = {k for k, v in action_def['parameters'].items() if v['required']} + return all(param in parameters for param in required_params) + + async def rollback(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> None: + """Rollback action if needed""" + pass + + def _validate_auth(self, auth_data: Optional[Dict[str, Any]] = None) -> bool: + """Validate authentication data""" + if self.auth_source == AuthSource.LOCAL: + return True + return bool(auth_data and auth_data.get('source') == self.auth_source) + + def _create_result(self, success: bool, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> MethodResult: + """Create a method result""" + return MethodResult( + success=success, + data=data, + metadata=metadata or {}, + validation=[] + ) + + def _add_validation_message(self, result: MethodResult, message: str) -> None: + """Add a validation message to the result""" + result.validation.append(message) \ No newline at end of file diff --git a/modules/methods/methodCoder.py b/modules/methods/methodCoder.py new file mode 100644 index 00000000..d39c1b08 --- /dev/null +++ b/modules/methods/methodCoder.py @@ -0,0 +1,272 @@ +from typing import Dict, Any, Optional +import logging +import ast +import re + +from modules.methods.methodBase import MethodBase, AuthSource, MethodResult + +logger = logging.getLogger(__name__) + +class MethodCoder(MethodBase): + """Coder method implementation for code operations""" + + def __init__(self): + super().__init__() + self.name = "coder" + self.description = "Handle code operations like analysis, generation, and refactoring" + self.auth_source = AuthSource.LOCAL # Code operations typically don't need auth + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "analyze": { + "description": "Analyze code structure and quality", + "retryMax": 2, + "timeout": 30, + "parameters": { + "code": {"type": "string", "required": True}, + "language": {"type": "string", "required": False}, + "metrics": {"type": "array", "items": "string", "required": False} + } + }, + "generate": { + "description": "Generate code based on requirements", + "retryMax": 2, + "timeout": 60, + "parameters": { + "requirements": {"type": "string", "required": True}, + "language": {"type": "string", "required": False}, + "style": {"type": "string", "required": False} + } + }, + "refactor": { + "description": "Refactor code for better quality", + "retryMax": 2, + "timeout": 60, + "parameters": { + "code": {"type": "string", "required": True}, + "language": {"type": "string", "required": False}, + "improvements": {"type": "array", "items": "string", "required": False} + } + } + } + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute coder method""" + try: + # Validate parameters + if not await self.validate_parameters(action, parameters): + return self._create_result( + success=False, + data={"error": f"Invalid parameters for {action}"} + ) + + # Execute action + if action == "analyze": + return await self._analyze_code(parameters) + elif action == "generate": + return await self._generate_code(parameters) + elif action == "refactor": + return await self._refactor_code(parameters) + else: + return self._create_result( + success=False, + data={"error": f"Unknown action: {action}"} + ) + + except Exception as e: + logger.error(f"Error executing coder {action}: {e}") + return self._create_result( + success=False, + data={"error": str(e)} + ) + + async def _analyze_code(self, parameters: Dict[str, Any]) -> MethodResult: + """Analyze code structure and quality""" + try: + code = parameters["code"] + language = parameters.get("language", "python") + metrics = parameters.get("metrics", ["complexity", "style", "documentation"]) + + analysis = {} + + if language.lower() == "python": + # Parse Python code + try: + tree = ast.parse(code) + + # Calculate basic metrics + analysis["metrics"] = { + "lines": len(code.splitlines()), + "classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]), + "functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]), + "imports": len([node for node in ast.walk(tree) if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom)]) + } + + # Check for common issues + analysis["issues"] = [] + + # Check for missing docstrings + if "documentation" in metrics: + for node in ast.walk(tree): + if isinstance(node, (ast.ClassDef, ast.FunctionDef)) and not ast.get_docstring(node): + analysis["issues"].append({ + "type": "missing_docstring", + "line": node.lineno, + "name": node.name + }) + + # Check for long functions + if "complexity" in metrics: + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + body_lines = len(node.body) + if body_lines > 20: # Arbitrary threshold + analysis["issues"].append({ + "type": "long_function", + "line": node.lineno, + "name": node.name, + "lines": body_lines + }) + + # Check for style issues + if "style" in metrics: + # Check line length + for i, line in enumerate(code.splitlines(), 1): + if len(line) > 100: # PEP 8 recommendation + analysis["issues"].append({ + "type": "line_too_long", + "line": i, + "length": len(line) + }) + + # Check for mixed tabs and spaces + if "\t" in code and " " in code: + analysis["issues"].append({ + "type": "mixed_tabs_spaces", + "message": "Code mixes tabs and spaces" + }) + + except SyntaxError as e: + return self._create_result( + success=False, + data={"error": f"Syntax error: {str(e)}"} + ) + else: + # TODO: Implement analysis for other languages + return self._create_result( + success=False, + data={"error": f"Unsupported language: {language}"} + ) + + return self._create_result( + success=True, + data={ + "language": language, + "analysis": analysis + } + ) + except Exception as e: + logger.error(f"Error analyzing code: {e}") + return self._create_result( + success=False, + data={"error": f"Analysis failed: {str(e)}"} + ) + + async def _generate_code(self, parameters: Dict[str, Any]) -> MethodResult: + """Generate code based on requirements""" + try: + requirements = parameters["requirements"] + language = parameters.get("language", "python") + style = parameters.get("style", "standard") + + # TODO: Implement code generation using AI or templates + # This is a placeholder implementation + if language.lower() == "python": + # Generate a simple Python class based on requirements + class_name = re.sub(r'[^a-zA-Z0-9]', '', requirements.split()[0].title()) + code = f"""class {class_name}: + \"\"\" + {requirements} + \"\"\" + + def __init__(self): + pass + + def process(self): + pass +""" + else: + return self._create_result( + success=False, + data={"error": f"Unsupported language: {language}"} + ) + + return self._create_result( + success=True, + data={ + "language": language, + "code": code + } + ) + except Exception as e: + logger.error(f"Error generating code: {e}") + return self._create_result( + success=False, + data={"error": f"Generation failed: {str(e)}"} + ) + + async def _refactor_code(self, parameters: Dict[str, Any]) -> MethodResult: + """Refactor code for better quality""" + try: + code = parameters["code"] + language = parameters.get("language", "python") + improvements = parameters.get("improvements", ["style", "complexity"]) + + if language.lower() == "python": + # Parse Python code + try: + tree = ast.parse(code) + + # Apply improvements + if "style" in improvements: + # Format code (placeholder) + code = code.strip() + + if "complexity" in improvements: + # TODO: Implement complexity reduction + pass + + if "documentation" in improvements: + # Add missing docstrings + for node in ast.walk(tree): + if isinstance(node, (ast.ClassDef, ast.FunctionDef)) and not ast.get_docstring(node): + # TODO: Generate docstring + pass + + except SyntaxError as e: + return self._create_result( + success=False, + data={"error": f"Syntax error: {str(e)}"} + ) + else: + return self._create_result( + success=False, + data={"error": f"Unsupported language: {language}"} + ) + + return self._create_result( + success=True, + data={ + "language": language, + "code": code, + "improvements": improvements + } + ) + except Exception as e: + logger.error(f"Error refactoring code: {e}") + return self._create_result( + success=False, + data={"error": f"Refactoring failed: {str(e)}"} + ) \ No newline at end of file diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py new file mode 100644 index 00000000..26aab156 --- /dev/null +++ b/modules/methods/methodDocument.py @@ -0,0 +1,287 @@ +from typing import Dict, Any, Optional +import logging +import os +from pathlib import Path +import docx +import PyPDF2 +import json +import yaml +import xml.etree.ElementTree as ET +from datetime import datetime, UTC + +from modules.methods.methodBase import MethodBase, AuthSource, MethodResult + +logger = logging.getLogger(__name__) + +class MethodDocument(MethodBase): + """Document method implementation for document operations""" + + def __init__(self): + super().__init__() + self.name = "document" + self.description = "Handle document operations like reading, writing, and converting documents" + self.auth_source = AuthSource.LOCAL # Document operations typically don't need auth + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "read": { + "description": "Read document content", + "retryMax": 2, + "timeout": 30, + "parameters": { + "path": {"type": "string", "required": True}, + "format": {"type": "string", "required": False}, + "encoding": {"type": "string", "required": False}, + "includeMetadata": {"type": "boolean", "required": False} + } + }, + "write": { + "description": "Write content to document", + "retryMax": 2, + "timeout": 30, + "parameters": { + "path": {"type": "string", "required": True}, + "content": {"type": "string", "required": True}, + "format": {"type": "string", "required": False}, + "encoding": {"type": "string", "required": False}, + "template": {"type": "string", "required": False} + } + }, + "convert": { + "description": "Convert document between formats", + "retryMax": 2, + "timeout": 60, + "parameters": { + "sourcePath": {"type": "string", "required": True}, + "targetPath": {"type": "string", "required": True}, + "sourceFormat": {"type": "string", "required": False}, + "targetFormat": {"type": "string", "required": False}, + "options": {"type": "object", "required": False} + } + } + } + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute document method""" + try: + # Validate parameters + if not await self.validate_parameters(action, parameters): + return self._create_result( + success=False, + data={"error": f"Invalid parameters for {action}"} + ) + + # Execute action + if action == "read": + return await self._read_document(parameters) + elif action == "write": + return await self._write_document(parameters) + elif action == "convert": + return await self._convert_document(parameters) + else: + return self._create_result( + success=False, + data={"error": f"Unknown action: {action}"} + ) + + except Exception as e: + logger.error(f"Error executing document {action}: {e}") + return self._create_result( + success=False, + data={"error": str(e)} + ) + + async def _read_document(self, parameters: Dict[str, Any]) -> MethodResult: + """Read document content""" + try: + path = Path(parameters["path"]) + if not path.exists(): + return self._create_result( + success=False, + data={"error": f"File not found: {path}"} + ) + + # Determine format if not specified + format = parameters.get("format") + if not format: + format = path.suffix[1:] if path.suffix else "txt" + + # Read content based on format + content = "" + encoding = parameters.get("encoding", "utf-8") + include_metadata = parameters.get("includeMetadata", False) + + if format.lower() in ["txt", "md"]: + with open(path, "r", encoding=encoding) as f: + content = f.read() + elif format.lower() == "docx": + doc = docx.Document(path) + content = "\n".join([paragraph.text for paragraph in doc.paragraphs]) + elif format.lower() == "pdf": + with open(path, "rb") as f: + pdf = PyPDF2.PdfReader(f) + content = "\n".join([page.extract_text() for page in pdf.pages]) + elif format.lower() == "json": + with open(path, "r", encoding=encoding) as f: + content = json.load(f) + elif format.lower() == "yaml": + with open(path, "r", encoding=encoding) as f: + content = yaml.safe_load(f) + elif format.lower() == "xml": + tree = ET.parse(path) + root = tree.getroot() + content = ET.tostring(root, encoding=encoding).decode(encoding) + else: + return self._create_result( + success=False, + data={"error": f"Unsupported format: {format}"} + ) + + result = { + "path": str(path), + "format": format, + "content": content + } + + if include_metadata: + result["metadata"] = { + "size": path.stat().st_size, + "modified": datetime.fromtimestamp(path.stat().st_mtime, UTC).isoformat(), + "created": datetime.fromtimestamp(path.stat().st_ctime, UTC).isoformat() + } + + return self._create_result( + success=True, + data=result + ) + except Exception as e: + logger.error(f"Error reading document: {e}") + return self._create_result( + success=False, + data={"error": f"Read failed: {str(e)}"} + ) + + async def _write_document(self, parameters: Dict[str, Any]) -> MethodResult: + """Write content to document""" + try: + path = Path(parameters["path"]) + + # Create directory if it doesn't exist + path.parent.mkdir(parents=True, exist_ok=True) + + # Determine format if not specified + format = parameters.get("format") + if not format: + format = path.suffix[1:] if path.suffix else "txt" + + # Write content based on format + encoding = parameters.get("encoding", "utf-8") + content = parameters["content"] + template = parameters.get("template") + + if format.lower() in ["txt", "md"]: + with open(path, "w", encoding=encoding) as f: + f.write(content) + elif format.lower() == "docx": + if template: + doc = docx.Document(template) + else: + doc = docx.Document() + doc.add_paragraph(content) + doc.save(path) + elif format.lower() == "pdf": + # TODO: Implement PDF writing + return self._create_result( + success=False, + data={"error": "PDF writing not implemented yet"} + ) + elif format.lower() == "json": + with open(path, "w", encoding=encoding) as f: + json.dump(content, f, indent=2) + elif format.lower() == "yaml": + with open(path, "w", encoding=encoding) as f: + yaml.dump(content, f) + elif format.lower() == "xml": + with open(path, "w", encoding=encoding) as f: + f.write(content) + else: + return self._create_result( + success=False, + data={"error": f"Unsupported format: {format}"} + ) + + return self._create_result( + success=True, + data={ + "path": str(path), + "format": format, + "size": path.stat().st_size, + "modified": datetime.now(UTC).isoformat() + } + ) + except Exception as e: + logger.error(f"Error writing document: {e}") + return self._create_result( + success=False, + data={"error": f"Write failed: {str(e)}"} + ) + + async def _convert_document(self, parameters: Dict[str, Any]) -> MethodResult: + """Convert document between formats""" + try: + source_path = Path(parameters["sourcePath"]) + target_path = Path(parameters["targetPath"]) + + if not source_path.exists(): + return self._create_result( + success=False, + data={"error": f"Source file not found: {source_path}"} + ) + + # Determine formats if not specified + source_format = parameters.get("sourceFormat") + if not source_format: + source_format = source_path.suffix[1:] if source_path.suffix else "txt" + + target_format = parameters.get("targetFormat") + if not target_format: + target_format = target_path.suffix[1:] if target_path.suffix else "txt" + + # Read source content + source_content = await self._read_document({ + "path": str(source_path), + "format": source_format + }) + + if not source_content.success: + return source_content + + # Write target content + target_content = await self._write_document({ + "path": str(target_path), + "content": source_content.data["content"], + "format": target_format + }) + + if not target_content.success: + return target_content + + return self._create_result( + success=True, + data={ + "sourcePath": str(source_path), + "targetPath": str(target_path), + "sourceFormat": source_format, + "targetFormat": target_format, + "size": target_path.stat().st_size, + "modified": datetime.now(UTC).isoformat() + } + ) + except Exception as e: + logger.error(f"Error converting document: {e}") + return self._create_result( + success=False, + data={"error": f"Conversion failed: {str(e)}"} + ) \ No newline at end of file diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py new file mode 100644 index 00000000..908cbb3f --- /dev/null +++ b/modules/methods/methodOutlook.py @@ -0,0 +1,203 @@ +from typing import Dict, Any, Optional +import logging +from datetime import datetime, UTC +from O365 import Account, MSGraphProtocol + +from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.models.userConnection import UserConnection + +logger = logging.getLogger(__name__) + +class MethodOutlook(MethodBase): + """Outlook method implementation for email operations""" + + def __init__(self): + super().__init__() + self.name = "outlook" + self.description = "Handle Outlook email operations like reading and sending emails" + self.auth_source = AuthSource.MICROSOFT + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "readMails": { + "description": "Read emails from Outlook", + "retryMax": 2, + "timeout": 30, + "parameters": { + "folder": {"type": "string", "required": False}, + "query": {"type": "string", "required": False}, + "maxResults": {"type": "number", "required": False}, + "includeAttachments": {"type": "boolean", "required": False} + } + }, + "sendMail": { + "description": "Send email through Outlook", + "retryMax": 2, + "timeout": 30, + "parameters": { + "to": {"type": "array", "items": "string", "required": True}, + "subject": {"type": "string", "required": True}, + "body": {"type": "string", "required": True}, + "cc": {"type": "array", "items": "string", "required": False}, + "bcc": {"type": "array", "items": "string", "required": False}, + "attachments": {"type": "array", "items": "string", "required": False} + } + } + } + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute Outlook method""" + try: + # Validate parameters + if not await self.validate_parameters(action, parameters): + return self._create_result( + success=False, + data={"error": f"Invalid parameters for {action}"} + ) + + # Get UserConnection from auth_data + if not auth_data or "userConnection" not in auth_data: + return self._create_result( + success=False, + data={"error": "UserConnection required for Outlook operations"} + ) + + user_connection: UserConnection = auth_data["userConnection"] + + # Execute action + if action == "readMails": + return await self._read_mails(parameters, user_connection) + elif action == "sendMail": + return await self._send_mail(parameters, user_connection) + else: + return self._create_result( + success=False, + data={"error": f"Unknown action: {action}"} + ) + + except Exception as e: + logger.error(f"Error executing Outlook {action}: {e}") + return self._create_result( + success=False, + data={"error": str(e)} + ) + + async def _read_mails(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult: + """Read emails from Outlook""" + try: + folder = parameters.get("folder", "inbox") + query = parameters.get("query") + max_results = parameters.get("maxResults", 10) + include_attachments = parameters.get("includeAttachments", False) + + # Create Outlook account + account = Account( + credentials=(user_connection.authToken, user_connection.refreshToken), + protocol=MSGraphProtocol() + ) + + # Get mailbox + mailbox = account.mailbox() + + # Get folder + target_folder = mailbox.folder(folder_name=folder) + + # Get messages + if query: + messages = target_folder.get_messages(query=query, limit=max_results) + else: + messages = target_folder.get_messages(limit=max_results) + + # Process messages + results = [] + for message in messages: + msg_data = { + "id": message.object_id, + "subject": message.subject, + "from": message.sender.address, + "to": [to.address for to in message.to], + "cc": [cc.address for cc in message.cc], + "received": message.received.strftime("%Y-%m-%d %H:%M:%S"), + "body": message.body, + "hasAttachments": message.has_attachments + } + + if include_attachments and message.has_attachments: + attachments = [] + for attachment in message.attachments: + attachments.append({ + "name": attachment.name, + "contentType": attachment.content_type, + "size": attachment.size + }) + msg_data["attachments"] = attachments + + results.append(msg_data) + + return self._create_result( + success=True, + data={ + "folder": folder, + "query": query, + "messages": results + } + ) + except Exception as e: + logger.error(f"Error reading Outlook emails: {e}") + return self._create_result( + success=False, + data={"error": f"Read failed: {str(e)}"} + ) + + async def _send_mail(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult: + """Send email through Outlook""" + try: + to_addresses = parameters["to"] + subject = parameters["subject"] + body = parameters["body"] + cc_addresses = parameters.get("cc", []) + bcc_addresses = parameters.get("bcc", []) + attachments = parameters.get("attachments", []) + + # Create Outlook account + account = Account( + credentials=(user_connection.authToken, user_connection.refreshToken), + protocol=MSGraphProtocol() + ) + + # Get mailbox + mailbox = account.mailbox() + + # Create new message + message = mailbox.new_message() + message.to.add(to_addresses) + if cc_addresses: + message.cc.add(cc_addresses) + if bcc_addresses: + message.bcc.add(bcc_addresses) + message.subject = subject + message.body = body + + # Add attachments + for attachment_path in attachments: + message.attachments.add(attachment_path) + + # Send message + message.send() + + return self._create_result( + success=True, + data={ + "to": to_addresses, + "subject": subject, + "sent": datetime.now(UTC).isoformat() + } + ) + except Exception as e: + logger.error(f"Error sending Outlook email: {e}") + return self._create_result( + success=False, + data={"error": f"Send failed: {str(e)}"} + ) \ No newline at end of file diff --git a/modules/methods/methodPowerpoint.py b/modules/methods/methodPowerpoint.py new file mode 100644 index 00000000..bed7abc9 --- /dev/null +++ b/modules/methods/methodPowerpoint.py @@ -0,0 +1,199 @@ +from typing import Dict, Any, Optional +import logging +import os +from pathlib import Path + +from modules.methods.methodBase import MethodBase, AuthSource, MethodResult + +logger = logging.getLogger(__name__) + +class MethodPowerpoint(MethodBase): + """Powerpoint method implementation for PowerPoint operations""" + + def __init__(self): + super().__init__() + self.name = "powerpoint" + self.description = "Handle PowerPoint operations like reading, writing, and converting presentations" + self.auth_source = AuthSource.MICROSOFT # PowerPoint operations need Microsoft auth + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "read": { + "description": "Read PowerPoint presentation content", + "retryMax": 2, + "timeout": 30, + "parameters": { + "path": {"type": "string", "required": True}, + "format": {"type": "string", "required": False}, + "includeNotes": {"type": "boolean", "required": False} + } + }, + "write": { + "description": "Write content to PowerPoint presentation", + "retryMax": 2, + "timeout": 60, + "parameters": { + "path": {"type": "string", "required": True}, + "content": {"type": "object", "required": True}, + "template": {"type": "string", "required": False} + } + }, + "convert": { + "description": "Convert PowerPoint presentation between formats", + "retryMax": 2, + "timeout": 60, + "parameters": { + "sourcePath": {"type": "string", "required": True}, + "targetPath": {"type": "string", "required": True}, + "sourceFormat": {"type": "string", "required": False}, + "targetFormat": {"type": "string", "required": False} + } + } + } + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute powerpoint method""" + try: + # Validate parameters + if not await self.validate_parameters(action, parameters): + return self._create_result( + success=False, + data={"error": f"Invalid parameters for {action}"} + ) + + # Validate authentication + if not await self.validate_auth(auth_data): + return self._create_result( + success=False, + data={"error": "Authentication required for PowerPoint operations"} + ) + + # Execute action + if action == "read": + return await self._read_presentation(parameters, auth_data) + elif action == "write": + return await self._write_presentation(parameters, auth_data) + elif action == "convert": + return await self._convert_presentation(parameters, auth_data) + else: + return self._create_result( + success=False, + data={"error": f"Unknown action: {action}"} + ) + + except Exception as e: + logger.error(f"Error executing powerpoint {action}: {e}") + return self._create_result( + success=False, + data={"error": str(e)} + ) + + async def _read_presentation(self, parameters: Dict[str, Any], auth_data: Dict[str, Any]) -> MethodResult: + """Read PowerPoint presentation content""" + try: + path = Path(parameters["path"]) + if not path.exists(): + return self._create_result( + success=False, + data={"error": f"File not found: {path}"} + ) + + # Determine format if not specified + format = parameters.get("format") + if not format: + format = path.suffix[1:] if path.suffix else "pptx" + + # TODO: Implement PowerPoint reading using Microsoft Graph API + # This is a placeholder implementation + return self._create_result( + success=True, + data={ + "path": str(path), + "format": format, + "slides": [ + { + "number": 1, + "title": "Example Slide", + "content": "Example content", + "notes": "Example notes" if parameters.get("includeNotes", False) else None + } + ] + } + ) + except Exception as e: + logger.error(f"Error reading presentation: {e}") + return self._create_result( + success=False, + data={"error": f"Read failed: {str(e)}"} + ) + + async def _write_presentation(self, parameters: Dict[str, Any], auth_data: Dict[str, Any]) -> MethodResult: + """Write content to PowerPoint presentation""" + try: + path = Path(parameters["path"]) + + # Create directory if it doesn't exist + path.parent.mkdir(parents=True, exist_ok=True) + + # Determine format if not specified + format = parameters.get("format") + if not format: + format = path.suffix[1:] if path.suffix else "pptx" + + # TODO: Implement PowerPoint writing using Microsoft Graph API + # This is a placeholder implementation + return self._create_result( + success=True, + data={ + "path": str(path), + "format": format, + "slides": len(parameters["content"].get("slides", [])) + } + ) + except Exception as e: + logger.error(f"Error writing presentation: {e}") + return self._create_result( + success=False, + data={"error": f"Write failed: {str(e)}"} + ) + + async def _convert_presentation(self, parameters: Dict[str, Any], auth_data: Dict[str, Any]) -> MethodResult: + """Convert PowerPoint presentation between formats""" + try: + source_path = Path(parameters["sourcePath"]) + target_path = Path(parameters["targetPath"]) + + if not source_path.exists(): + return self._create_result( + success=False, + data={"error": f"Source file not found: {source_path}"} + ) + + # Determine formats if not specified + source_format = parameters.get("sourceFormat") + if not source_format: + source_format = source_path.suffix[1:] if source_path.suffix else "pptx" + + target_format = parameters.get("targetFormat") + if not target_format: + target_format = target_path.suffix[1:] if target_path.suffix else "pptx" + + # TODO: Implement PowerPoint conversion using Microsoft Graph API + # This is a placeholder implementation + return self._create_result( + success=True, + data={ + "sourcePath": str(source_path), + "targetPath": str(target_path), + "sourceFormat": source_format, + "targetFormat": target_format + } + ) + except Exception as e: + logger.error(f"Error converting presentation: {e}") + return self._create_result( + success=False, + data={"error": f"Conversion failed: {str(e)}"} + ) \ No newline at end of file diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py new file mode 100644 index 00000000..893dccfa --- /dev/null +++ b/modules/methods/methodSharepoint.py @@ -0,0 +1,217 @@ +from typing import Dict, Any, Optional +import logging +from datetime import datetime, UTC +from office365.runtime.auth.user_credential import UserCredential +from office365.sharepoint.client_context import ClientContext +from office365.sharepoint.files.file import File +from office365.sharepoint.lists.list import List +from office365.sharepoint.lists.list_creation_information import ListCreationInformation + +from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.models.userConnection import UserConnection + +logger = logging.getLogger(__name__) + +class MethodSharepoint(MethodBase): + """SharePoint method implementation for document operations""" + + def __init__(self): + super().__init__() + self.name = "sharepoint" + self.description = "Handle SharePoint document operations like search, read, and write" + self.auth_source = AuthSource.MICROSOFT + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "search": { + "description": "Search SharePoint documents", + "retryMax": 3, + "timeout": 30, + "parameters": { + "query": {"type": "string", "required": True}, + "siteUrl": {"type": "string", "required": True}, + "listName": {"type": "string", "required": False}, + "maxResults": {"type": "number", "required": False} + } + }, + "read": { + "description": "Read SharePoint document content", + "retryMax": 2, + "timeout": 30, + "parameters": { + "fileUrl": {"type": "string", "required": True}, + "siteUrl": {"type": "string", "required": True} + } + }, + "write": { + "description": "Write content to SharePoint document", + "retryMax": 2, + "timeout": 30, + "parameters": { + "fileUrl": {"type": "string", "required": True}, + "siteUrl": {"type": "string", "required": True}, + "content": {"type": "string", "required": True}, + "contentType": {"type": "string", "required": False} + } + } + } + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute SharePoint method""" + try: + # Validate parameters + if not await self.validate_parameters(action, parameters): + return self._create_result( + success=False, + data={"error": f"Invalid parameters for {action}"} + ) + + # Get UserConnection from auth_data + if not auth_data or "userConnection" not in auth_data: + return self._create_result( + success=False, + data={"error": "UserConnection required for SharePoint operations"} + ) + + user_connection: UserConnection = auth_data["userConnection"] + + # Execute action + if action == "search": + return await self._search_documents(parameters, user_connection) + elif action == "read": + return await self._read_document(parameters, user_connection) + elif action == "write": + return await self._write_document(parameters, user_connection) + else: + return self._create_result( + success=False, + data={"error": f"Unknown action: {action}"} + ) + + except Exception as e: + logger.error(f"Error executing SharePoint {action}: {e}") + return self._create_result( + success=False, + data={"error": str(e)} + ) + + async def _search_documents(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult: + """Search SharePoint documents""" + try: + site_url = parameters["siteUrl"] + query = parameters["query"] + list_name = parameters.get("listName") + max_results = parameters.get("maxResults", 10) + + # Create SharePoint context + ctx = ClientContext(site_url).with_credentials( + UserCredential(user_connection.authToken, user_connection.refreshToken) + ) + + # Search in specific list or entire site + if list_name: + target_list = ctx.web.lists.get_by_title(list_name) + items = target_list.items.filter(f"Title eq '{query}'").top(max_results).get().execute_query() + results = [{ + "title": item.properties["Title"], + "url": item.properties["FileRef"], + "modified": item.properties["Modified"], + "created": item.properties["Created"] + } for item in items] + else: + # Search entire site + search_results = ctx.search(query).execute_query() + results = [{ + "title": result.properties["Title"], + "url": result.properties["Path"], + "modified": result.properties["LastModifiedTime"], + "created": result.properties["Created"] + } for result in search_results[:max_results]] + + return self._create_result( + success=True, + data={ + "query": query, + "results": results + } + ) + except Exception as e: + logger.error(f"Error searching SharePoint documents: {e}") + return self._create_result( + success=False, + data={"error": f"Search failed: {str(e)}"} + ) + + async def _read_document(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult: + """Read SharePoint document content""" + try: + site_url = parameters["siteUrl"] + file_url = parameters["fileUrl"] + + # Create SharePoint context + ctx = ClientContext(site_url).with_credentials( + UserCredential(user_connection.authToken, user_connection.refreshToken) + ) + + # Get file + file = ctx.web.get_file_by_server_relative_url(file_url) + file_content = file.read().execute_query() + + return self._create_result( + success=True, + data={ + "url": file_url, + "content": file_content.content.decode('utf-8'), + "modified": file.properties["TimeLastModified"], + "size": file.properties["Length"] + } + ) + except Exception as e: + logger.error(f"Error reading SharePoint document: {e}") + return self._create_result( + success=False, + data={"error": f"Read failed: {str(e)}"} + ) + + async def _write_document(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult: + """Write content to SharePoint document""" + try: + site_url = parameters["siteUrl"] + file_url = parameters["fileUrl"] + content = parameters["content"] + content_type = parameters.get("contentType", "text/plain") + + # Create SharePoint context + ctx = ClientContext(site_url).with_credentials( + UserCredential(user_connection.authToken, user_connection.refreshToken) + ) + + # Get or create file + try: + file = ctx.web.get_file_by_server_relative_url(file_url) + except: + # Create new file + folder_url = "/".join(file_url.split("/")[:-1]) + file_name = file_url.split("/")[-1] + folder = ctx.web.get_folder_by_server_relative_url(folder_url) + file = folder.upload_file(file_name, content.encode('utf-8')).execute_query() + + # Update file content + file.write(content.encode('utf-8')).execute_query() + + return self._create_result( + success=True, + data={ + "url": file_url, + "modified": datetime.now(UTC).isoformat(), + "size": len(content.encode('utf-8')) + } + ) + except Exception as e: + logger.error(f"Error writing SharePoint document: {e}") + return self._create_result( + success=False, + data={"error": f"Write failed: {str(e)}"} + ) \ No newline at end of file diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py new file mode 100644 index 00000000..cf968ed4 --- /dev/null +++ b/modules/methods/methodWeb.py @@ -0,0 +1,398 @@ +from typing import Dict, Any, Optional +import logging +import aiohttp +import asyncio +from bs4 import BeautifulSoup +from urllib.parse import urljoin, urlparse +import re +from datetime import datetime, UTC +import requests +import time + +from modules.methods.methodBase import MethodBase, AuthSource, MethodResult +from modules.shared.configuration import APP_CONFIG + +logger = logging.getLogger(__name__) + +class MethodWeb(MethodBase): + """Web method implementation for web operations""" + + def __init__(self): + super().__init__() + self.name = "web" + self.description = "Handle web operations like search, crawl, and content extraction" + self.auth_source = AuthSource.LOCAL # Web operations typically don't need auth + + # Web crawling configuration from agentWebcrawler + self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "") + self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google") + self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto") + self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5")) + self.timeout = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_TIMEOUT", "30")) + self.userAgent = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") + + if not self.srcApikey: + logger.error("SerpAPI key not configured") + + @property + def actions(self) -> Dict[str, Dict[str, Any]]: + """Available actions and their parameters""" + return { + "search": { + "description": "Search web content", + "retryMax": 3, + "timeout": 30, + "parameters": { + "query": {"type": "string", "required": True}, + "maxResults": {"type": "number", "required": False}, + "filters": {"type": "object", "required": False}, + "searchEngine": {"type": "string", "required": False} + } + }, + "crawl": { + "description": "Crawl web pages", + "retryMax": 2, + "timeout": 60, + "parameters": { + "url": {"type": "string", "required": True}, + "depth": {"type": "number", "required": False}, + "followLinks": {"type": "boolean", "required": False}, + "includeImages": {"type": "boolean", "required": False}, + "respectRobots": {"type": "boolean", "required": False} + } + }, + "extract": { + "description": "Extract content from web page", + "retryMax": 2, + "timeout": 30, + "parameters": { + "url": {"type": "string", "required": True}, + "selectors": {"type": "array", "items": "string", "required": False}, + "format": {"type": "string", "required": False}, + "includeMetadata": {"type": "boolean", "required": False} + } + } + } + + async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult: + """Execute web method""" + try: + # Validate parameters + if not await self.validate_parameters(action, parameters): + return self._create_result( + success=False, + data={"error": f"Invalid parameters for {action}"} + ) + + # Execute action + if action == "search": + return await self._search_web(parameters) + elif action == "crawl": + return await self._crawl_page(parameters) + elif action == "extract": + return await self._extract_content(parameters) + else: + return self._create_result( + success=False, + data={"error": f"Unknown action: {action}"} + ) + + except Exception as e: + logger.error(f"Error executing web {action}: {e}") + return self._create_result( + success=False, + data={"error": str(e)} + ) + + async def _search_web(self, parameters: Dict[str, Any]) -> MethodResult: + """Search web content""" + try: + query = parameters["query"] + max_results = parameters.get("maxResults", 10) + filters = parameters.get("filters", {}) + search_engine = parameters.get("searchEngine", "google") + + # Implement search using different engines + if search_engine.lower() == "google": + # Use Google Custom Search API + # TODO: Implement Google Custom Search API integration + results = await self._google_search(query, max_results, filters) + elif search_engine.lower() == "bing": + # Use Bing Web Search API + # TODO: Implement Bing Web Search API integration + results = await self._bing_search(query, max_results, filters) + else: + return self._create_result( + success=False, + data={"error": f"Unsupported search engine: {search_engine}"} + ) + + return self._create_result( + success=True, + data={ + "query": query, + "engine": search_engine, + "results": results + } + ) + except Exception as e: + logger.error(f"Error searching web: {e}") + return self._create_result( + success=False, + data={"error": f"Search failed: {str(e)}"} + ) + + async def _google_search(self, query: str, max_results: int, filters: Dict[str, Any]) -> list: + """Search using Google Custom Search API""" + # TODO: Implement Google Custom Search API + # This is a placeholder implementation + return [ + { + "title": "Example Result", + "url": "https://example.com", + "snippet": "Example search result snippet", + "source": "google" + } + ] + + async def _bing_search(self, query: str, max_results: int, filters: Dict[str, Any]) -> list: + """Search using Bing Web Search API""" + # TODO: Implement Bing Web Search API + # This is a placeholder implementation + return [ + { + "title": "Example Result", + "url": "https://example.com", + "snippet": "Example search result snippet", + "source": "bing" + } + ] + + async def _crawl_page(self, parameters: Dict[str, Any]) -> MethodResult: + """Crawl web pages""" + try: + url = parameters["url"] + depth = parameters.get("depth", 1) + follow_links = parameters.get("followLinks", False) + include_images = parameters.get("includeImages", False) + respect_robots = parameters.get("respectRobots", True) + + # Check robots.txt if required + if respect_robots: + if not await self._check_robots_txt(url): + return self._create_result( + success=False, + data={"error": "Crawling not allowed by robots.txt"} + ) + + # Crawl the page + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + html = await response.text() + soup = BeautifulSoup(html, 'html.parser') + + # Extract basic information + result = { + "url": url, + "title": soup.title.string if soup.title else None, + "description": self._get_meta_description(soup), + "links": [], + "images": [] if include_images else None, + "text": soup.get_text(strip=True), + "crawled": datetime.now(UTC).isoformat() + } + + # Extract links if followLinks is True + if follow_links: + base_url = url + for link in soup.find_all('a'): + href = link.get('href') + if href: + absolute_url = urljoin(base_url, href) + if self._is_valid_url(absolute_url): + result["links"].append({ + "url": absolute_url, + "text": link.get_text(strip=True) + }) + + # Extract images if includeImages is True + if include_images: + for img in soup.find_all('img'): + src = img.get('src') + if src: + absolute_src = urljoin(url, src) + result["images"].append({ + "url": absolute_src, + "alt": img.get('alt', ''), + "title": img.get('title', '') + }) + + return self._create_result( + success=True, + data=result + ) + else: + return self._create_result( + success=False, + data={"error": f"Failed to fetch URL: {response.status}"} + ) + except Exception as e: + logger.error(f"Error crawling page: {e}") + return self._create_result( + success=False, + data={"error": f"Crawl failed: {str(e)}"} + ) + + async def _extract_content(self, parameters: Dict[str, Any]) -> MethodResult: + """Extract content from web page""" + try: + url = parameters["url"] + selectors = parameters.get("selectors") + format = parameters.get("format", "text") + include_metadata = parameters.get("includeMetadata", False) + + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + if response.status == 200: + html = await response.text() + soup = BeautifulSoup(html, 'html.parser') + + # Extract content based on selectors + content = {} + if selectors: + for selector in selectors: + elements = soup.select(selector) + content[selector] = [elem.get_text() for elem in elements] + else: + # Default extraction + content = { + "title": soup.title.string if soup.title else None, + "text": soup.get_text(strip=True), + "links": [a.get('href') for a in soup.find_all('a')] + } + + # Add metadata if requested + if include_metadata: + content["metadata"] = { + "url": url, + "crawled": datetime.now(UTC).isoformat(), + "language": self._detect_language(soup), + "wordCount": len(content["text"].split()), + "linksCount": len(content["links"]) + } + + return self._create_result( + success=True, + data={ + "url": url, + "content": content + } + ) + else: + return self._create_result( + success=False, + data={"error": f"Failed to fetch URL: {response.status}"} + ) + except Exception as e: + logger.error(f"Error extracting content: {e}") + return self._create_result( + success=False, + data={"error": f"Extraction failed: {str(e)}"} + ) + + def _get_meta_description(self, soup: BeautifulSoup) -> Optional[str]: + """Extract meta description from HTML""" + meta_desc = soup.find('meta', attrs={'name': 'description'}) + if meta_desc: + return meta_desc.get('content') + return None + + def _is_valid_url(self, url: str) -> bool: + """Check if URL is valid""" + try: + result = urlparse(url) + return all([result.scheme, result.netloc]) + except: + return False + + async def _check_robots_txt(self, url: str) -> bool: + """Check if URL is allowed by robots.txt""" + try: + parsed_url = urlparse(url) + robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" + + async with aiohttp.ClientSession() as session: + async with session.get(robots_url, headers={"User-Agent": self.userAgent}, timeout=self.timeout) as response: + if response.status == 200: + robots_content = await response.text() + + # Parse robots.txt content + user_agent = "*" # Default to all user agents + disallow_paths = [] + + for line in robots_content.splitlines(): + line = line.strip().lower() + if line.startswith("user-agent:"): + user_agent = line[11:].strip() + elif line.startswith("disallow:") and user_agent in ["*", self.userAgent.lower()]: + path = line[9:].strip() + if path: + disallow_paths.append(path) + + # Check if URL path is disallowed + url_path = parsed_url.path + for disallow_path in disallow_paths: + if url_path.startswith(disallow_path): + return False + + return True + else: + # If robots.txt doesn't exist, assume crawling is allowed + return True + + except Exception as e: + logger.warning(f"Error checking robots.txt for {url}: {str(e)}") + # If there's an error, assume crawling is allowed + return True + + def _detect_language(self, soup: BeautifulSoup) -> str: + """Detect page language""" + try: + # Try to get language from HTML lang attribute + if soup.html and soup.html.get('lang'): + return soup.html.get('lang') + + # Try to get language from meta tag + meta_lang = soup.find('meta', attrs={'http-equiv': 'content-language'}) + if meta_lang: + return meta_lang.get('content', 'en') + + # Try to get language from meta charset + meta_charset = soup.find('meta', attrs={'charset': True}) + if meta_charset: + charset = meta_charset.get('charset', '').lower() + if 'utf-8' in charset: + return 'en' # Default to English for UTF-8 + + # Try to detect language from content + # This is a simple heuristic based on common words + text = soup.get_text().lower() + common_words = { + 'en': ['the', 'and', 'of', 'to', 'in', 'is', 'that', 'for', 'it', 'with'], + 'es': ['el', 'la', 'los', 'las', 'de', 'y', 'en', 'que', 'por', 'con'], + 'fr': ['le', 'la', 'les', 'de', 'et', 'en', 'que', 'pour', 'avec', 'dans'], + 'de': ['der', 'die', 'das', 'und', 'in', 'den', 'von', 'zu', 'für', 'mit'] + } + + word_counts = {lang: sum(1 for word in words if f' {word} ' in f' {text} ') + for lang, words in common_words.items()} + + if word_counts: + return max(word_counts.items(), key=lambda x: x[1])[0] + + return 'en' # Default to English if no language detected + + except Exception as e: + logger.warning(f"Error detecting language: {str(e)}") + return 'en' # Default to English on error \ No newline at end of file diff --git a/modules/workflow/agentBase.py b/modules/workflow/agentBase.py deleted file mode 100644 index 0c79681a..00000000 --- a/modules/workflow/agentBase.py +++ /dev/null @@ -1,214 +0,0 @@ -""" -Agent Base Module. -Provides the base class for all chat agents. -Defines the standardized interface for task processing. -""" - -import os -import logging -import uuid -from datetime import datetime, UTC -from typing import Dict, Any, List, Optional -from modules.shared.mimeUtils import isTextMimeType, determineContentEncoding -from modules.interfaces.serviceChatModel import ChatContent, Task, AgentResponse, ChatMessage - -logger = logging.getLogger(__name__) - -class AgentBase: - """ - Base class for all chat agents. - Defines the standardized interface for task processing. - """ - - def __init__(self): - """Initialize the base agent.""" - self.name = "base" - self.label = "Base Agent" - self.description = "Base agent functionality" - self.capabilities = [] - self.service = None - - def setService(self, service): - """ - Set the service container reference and validate required interfaces. - - Args: - service: The service container with required interfaces - """ - if not service: - logger.warning("Attempted to set null service container") - return False - - # Validate required interfaces - required_interfaces = ['base', 'msft', 'google'] - missing_interfaces = [] - for interface in required_interfaces: - if not hasattr(service, interface): - missing_interfaces.append(interface) - - if missing_interfaces: - logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}") - return False - - self.service = service - return True - - def getAgentInfo(self) -> Dict[str, Any]: - """ - Return standardized information about the agent's capabilities. - - Returns: - Dictionary with name, description, and capabilities - """ - return { - "name": self.name, - "label": self.label, - "description": self.description, - "capabilities": self.capabilities - } - - async def execute(self, task: Task) -> AgentResponse: - """ - Execute a task and return the response. - This method must be implemented by all concrete agent classes. - - Args: - task: Task object containing all necessary information - - Returns: - AgentResponse object with execution results - """ - # Validate service manager - if not self.service: - logger.error("Service container not initialized") - return AgentResponse( - success=False, - message=ChatMessage( - id=str(uuid.uuid4()), - workflowId=task.workflowId, - agentName=self.name, - message="Error: Service container not initialized", - role="system", - status="error", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=False - ), - performance={}, - progress=0.0 - ) - - try: - # Process the task using the concrete implementation - result = await self.processTask(task) - - # Create response message - message = ChatMessage( - id=str(uuid.uuid4()), - workflowId=task.workflowId, - agentName=self.name, - message=result.get("feedback", ""), - role="assistant", - status="completed", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=True - ) - - # Create response with performance metrics - return AgentResponse( - success=True, - message=message, - performance=result.get("performance", {}), - progress=result.get("progress", 100.0) - ) - - except Exception as e: - logger.error(f"Error processing task: {str(e)}", exc_info=True) - return AgentResponse( - success=False, - message=ChatMessage( - id=str(uuid.uuid4()), - workflowId=task.workflowId, - agentName=self.name, - message=f"Error processing task: {str(e)}", - role="system", - status="error", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=False - ), - performance={}, - progress=0.0 - ) - - async def processTask(self, task: Task) -> Dict[str, Any]: - """ - Process a task and return the results. - This method must be implemented by all concrete agent classes. - - Args: - task: Task object containing all necessary information - - Returns: - Dictionary containing: - - feedback: Text response explaining what the agent did - - performance: Optional performance metrics - - progress: Task progress (0-100) - """ - raise NotImplementedError("processTask must be implemented by concrete agent classes") - - def determineBase64EncodingFlag(self, filename: str, content: Any, mimeType: str = None) -> bool: - """ - Determine if content should be base64 encoded. - - Args: - filename: Name of the file - content: Content to check - mimeType: Optional MIME type - - Returns: - Boolean indicating if content should be base64 encoded - """ - return determineContentEncoding(filename, content, mimeType) - - def isTextMimeType(self, mimeType: str) -> bool: - """ - Check if MIME type is text-based. - - Args: - mimeType: MIME type to check - - Returns: - Boolean indicating if MIME type is text-based - """ - return isTextMimeType(mimeType) - - def formatAgentDocumentOutput(self, label: str, content: str, contentType: str, base64Encoded: bool = False) -> ChatContent: - """ - Format agent document output using ChatContent model. - - Args: - label: Document label/filename - content: Document content - contentType: MIME type of content - base64Encoded: Whether content is base64 encoded - - Returns: - ChatContent object with the following attributes: - - sequenceNr: Sequence number (defaults to 1) - - name: Document label/filename - - mimeType: MIME type of content - - data: Actual content - - metadata: Additional metadata including base64Encoded flag - """ - return ChatContent( - sequenceNr=1, - name=label, - mimeType=contentType, - data=content, - metadata={"base64Encoded": base64Encoded} - ) \ No newline at end of file diff --git a/modules/workflow/agentManager.py b/modules/workflow/agentManager.py deleted file mode 100644 index 91bae403..00000000 --- a/modules/workflow/agentManager.py +++ /dev/null @@ -1,212 +0,0 @@ -""" -Agent Manager Module for managing agent operations and execution. -""" - -import os -import logging -import importlib -from typing import Dict, Any, List, Optional, Tuple -from datetime import datetime, UTC -import uuid -from modules.interfaces.serviceChatModel import ( - ChatMessage, ChatDocument, UserInputRequest, ChatWorkflow, AgentResponse -) -logger = logging.getLogger(__name__) - -class AgentManager: - """Manager for agent operations and execution.""" - - _instance = None - - @classmethod - def getInstance(cls): - """Return a singleton instance of the agent manager.""" - if cls._instance is None: - cls._instance = cls() - return cls._instance - - # Internal Methods - - def __init__(self): - """Initialize the agent manager.""" - if AgentManager._instance is not None: - raise RuntimeError("Singleton instance already exists - use getInstance()") - - self.service = None - self.agents = {} # Dictionary to store agent instances - self._loadAgents() # Load agents on initialization - - def _loadAgents(self): - """Load all available agents from modules dynamically.""" - logger.info("Loading agent modules...") - - # Get the agents directory path - agentDir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "agents") - - # Search for agent modules - agentModules = [] - for filename in os.listdir(agentDir): - if filename.startswith("agent") and filename.endswith(".py"): - agentModules.append(filename[:-3]) # Remove .py extension - - if not agentModules: - logger.warning("No agent modules found in directory: %s", agentDir) - return - - logger.info(f"Found {len(agentModules)} agent modules: {', '.join(agentModules)}") - - # Load each agent module - for moduleName in agentModules: - try: - # Import the module - module = importlib.import_module(f"modules.agents.{moduleName}") - - # Extract agent name from module name - agentName = moduleName.split("agent")[-1] - className = f"Agent{agentName}" - getterName = f"getAgent{agentName}" - - agent = None - - # Try to get the agent via the getter function first - if hasattr(module, getterName): - getterFunc = getattr(module, getterName) - agent = getterFunc() - logger.info(f"Agent '{agent.name}' loaded via {getterName}()") - - # If no getter, try to instantiate the agent class directly - elif hasattr(module, className): - agentClass = getattr(module, className) - agent = agentClass() - logger.info(f"Agent '{agent.name}' directly instantiated from {className}") - - if agent: - # Register the agent - if self._registerAgent(agent): - logger.info(f"Successfully registered agent: {agent.name}") - else: - logger.error(f"Failed to register agent from module: {moduleName}") - else: - logger.warning(f"No agent class or getter function found in module: {moduleName}") - - except ImportError as e: - logger.error(f"Failed to import module {moduleName}: {str(e)}") - except Exception as e: - logger.error(f"Error loading agent from module {moduleName}: {str(e)}") - - def _registerAgent(self, agent: Any): - """Register a new agent with the manager.""" - if not hasattr(agent, 'name'): - logger.error("Agent must have a name attribute") - return False - - self.agents[agent.name] = agent - if self.service and hasattr(agent, 'setService'): - agent.setService(self.service) - - return True - - # Public Methods - - def initialize(self, service: Any): - """Initialize the manager with service reference.""" - # Store service reference - self.service = service - - # Initialize agents with service - for agent in self.agents.values(): - if hasattr(agent, 'setService'): - agent.setService(service) - - return True - - def getAgent(self, agentIdentifier: str) -> Optional[Any]: - """ - Get an agent instance by its identifier. - - Args: - agentIdentifier: Name or identifier of the agent - - Returns: - Agent instance if found, None otherwise - """ - agent = self.agents.get(agentIdentifier) - if not agent: - logger.warning(f"Agent '{agentIdentifier}' not found") - return agent - - def getAllAgents(self) -> Dict[str, Any]: - """ - Get all registered agents. - - Returns: - Dictionary mapping agent names to agent instances - """ - return self.agents.copy() - - def getAgentInfos(self) -> List[Dict[str, Any]]: - """Get information about all registered agents.""" - return [ - { - 'name': agent.name, - 'description': getattr(agent, 'description', ''), - 'capabilities': getattr(agent, 'capabilities', []), - 'inputTypes': getattr(agent, 'inputTypes', []), - 'outputTypes': getattr(agent, 'outputTypes', []) - } - for agent in self.agents.values() - ] - - async def executeAgent(self, handover: Any) -> AgentResponse: - """ - Execute an agent with the given handover. - - Args: - handover: Handover object containing agent execution context - - Returns: - AgentResponse object with execution results - """ - try: - # Get agent instance - agent = self.agents.get(handover.currentAgent) - if not agent: - raise ValueError(f"Agent {handover.currentAgent} not found") - - # Execute agent - response = await agent.execute(handover) - - # Save output files if any - if response.message and response.message.documents: - self.service.document['agentOutputFilesSave'](handover, response.message.documents) - - return response - - except Exception as e: - logger.error(f"Error executing agent {handover.currentAgent}: {str(e)}") - - # Create error message - errorMessage = ChatMessage( - id=str(uuid.uuid4()), - workflowId=handover.workflowId, - agentName=handover.currentAgent, - message=f"Error executing agent: {str(e)}", - role="system", - status="error", - sequenceNr=0, - startedAt=handover.startedAt, - finishedAt=datetime.now(UTC).isoformat(), - success=False - ) - - return AgentResponse( - success=False, - message=errorMessage, - error=str(e), - performance={}, - progress=0.0 - ) - -# Singleton factory for the agent manager -def getAgentManager(): - return AgentManager.getInstance() \ No newline at end of file diff --git a/modules/workflow/chatManager.py b/modules/workflow/chatManager.py deleted file mode 100644 index bb3e1262..00000000 --- a/modules/workflow/chatManager.py +++ /dev/null @@ -1,617 +0,0 @@ -""" -Chat Manager Module for managing chat workflows and agent handovers. -""" - -import logging -from typing import Dict, Any, List, Optional, Union -from datetime import datetime, UTC -import uuid -import json -from dataclasses import dataclass -from modules.interfaces.serviceChatModel import ( - ChatLog, ChatMessage, ChatDocument, UserInputRequest, ChatWorkflow, - AgentHandover -) -from modules.workflow.agentManager import getAgentManager -from modules.workflow.documentManager import getDocumentManager - -logger = logging.getLogger(__name__) - -class ChatManager: - """Manager for chat workflows and agent handovers.""" - - _instance = None - - @classmethod - def getInstance(cls): - """Return a singleton instance of the chat manager.""" - if cls._instance is None: - cls._instance = cls() - return cls._instance - - # Core functions - - def __init__(self): - """Initialize the chat manager.""" - if ChatManager._instance is not None: - raise RuntimeError("Singleton instance already exists - use getInstance()") - - self.service = None - self.agentManager = getAgentManager() - self.documentManager = getDocumentManager() - - def initialize(self, workflow: ChatWorkflow): - """ - Initialize the manager with an optional workflow object. - - Args: - workflow: Optional ChatWorkflow object to initialize with - """ - # Initialize managers - self.agentManager.initialize(self.service) - self.documentManager.initialize(self.service) - - # Add basic references to service - self.service.workflow = workflow - self.service.logAdd = self.logAdd - - self.service.user = { - 'id': None, - 'name': None, - 'language': 'en' - } - self.service.functions = { - 'forEach': lambda items, action: [action(item) for item in items], - 'while': lambda condition, action: [action() for _ in iter(lambda: condition(), False)] - } - self.service.model = { - 'callAiBasic': self._callAiBasic, - 'callAiComplex': self._callAiComplex, - 'callAiImage': self._callAiImage - } - - # Initialize document operations - self.service.document = { - 'extract': self.documentManager.extractContent, - 'convertFileRefToFileId': self.documentManager.convertFileRefToId, - 'convertFileIdToFileRef': self.documentManager.convertFileIdToRef, - 'convertDataFormat': self.documentManager.convertDataFormat, - 'agentInputFilesCreate': self.documentManager.createAgentInputFileList, - 'agentOutputFilesSave': self.documentManager.saveAgentOutputFiles - } - - # Initialize data access - from modules.workflow.dataAccessFunctions import get_data_access - self.service.data = get_data_access().to_service_object() - - return True - - def createInitialHandover(self, userInput: UserInputRequest) -> AgentHandover: - """ - Create the initial handover object from user input. - - Args: - userInput: User input request - - Returns: - Initial handover object - """ - try: - # Create initial handover - handover = AgentHandover( - promptUserInitial=userInput.message, - documentsUserInitial=userInput.listFileId or [], - startedAt=datetime.now(UTC).isoformat() - ) - - # Process user input documents - if handover.documentsUserInitial: - handover.documentsInput = handover.documentsUserInitial - - # Set initial prompt for next agent - handover.promptForNextAgent = handover.promptUserInitial - - return handover - - except Exception as e: - logger.error(f"Error creating initial handover: {str(e)}") - return AgentHandover(status="failed", error=str(e)) - - async def defineNextHandover(self, currentHandover: AgentHandover) -> Optional[AgentHandover]: - """ - Define the next handover object for agent transition. - - Args: - currentHandover: Current handover object - - Returns: - Next handover object or None if no next agent - """ - try: - # Get available agents - availableAgents = self.agentManager.getAgentInfos() - if not availableAgents: - logger.warning("No available agents found") - return None - - # Create next handover object - nextHandover = AgentHandover( - promptUserInitial=currentHandover.promptUserInitial, - documentsUserInitial=currentHandover.documentsUserInitial, - startedAt=datetime.now(UTC).isoformat() - ) - - # If this is the first handover, use initial documents - if not currentHandover.promptFromFinishedAgent: - nextHandover.documentsInput = currentHandover.documentsUserInitial - nextHandover.promptForNextAgent = currentHandover.promptUserInitial - else: - # Use output documents from previous agent - nextHandover.documentsInput = currentHandover.documentsOutput - nextHandover.promptForNextAgent = currentHandover.promptFromFinishedAgent - - # Select next agent based on available agents and current state - nextAgent = await self._selectNextAgent(availableAgents, nextHandover) - if not nextAgent: - logger.info("No suitable next agent found") - return None - - nextHandover.nextAgent = nextAgent['name'] - return nextHandover - - except Exception as e: - logger.error(f"Error defining next handover: {str(e)}") - return None - - async def _selectNextAgent(self, availableAgents: List[Dict[str, Any]], handover: AgentHandover) -> Optional[Dict[str, Any]]: - """ - Select the next agent using AI analysis of the current state and requirements. - - Args: - availableAgents: List of available agents - handover: Current handover object - - Returns: - Selected agent or None if no suitable agent - """ - try: - if not availableAgents: - logger.warning("No available agents found") - return None - - # Get current workflow state - workflow = self.service.workflow - if not workflow: - logger.error("No workflow context available") - return None - - # Detect user language if not already set - if not workflow.userLanguage: - workflow.userLanguage = await self._detectUserLanguage(handover.promptUserInitial) - - # Get workflow summary for context - workflow_summary = await self.workflowSummarize(ChatMessage( - id=str(uuid.uuid4()), - workflowId=workflow.id, - role="user", - message=handover.promptUserInitial - )) - - # Prepare context for AI analysis - context = { - "current_state": { - "previous_agent": handover.currentAgent, - "status": handover.status, - "error": handover.error, - "user_language": workflow.userLanguage, - "input_documents": handover.documentsInput or [], - "output_documents": handover.documentsOutput or [], - "required_capabilities": handover.requiredCapabilities or [] - }, - "conversation_history": workflow_summary, - "available_agents": [ - { - "name": agent.get("name", ""), - "capabilities": agent.get("capabilities", {}), - "description": agent.get("description", "") - } - for agent in availableAgents - ] - } - - # Create prompt for AI to analyze and select next agent - prompt = f""" - Analyze the current workflow state, conversation history, and available agents to determine the most suitable next agent. - Consider the following factors: - 1. Previous agent's status and any errors - 2. Required capabilities for the task - 3. Document type compatibility - 4. Language requirements - 5. Agent's capabilities and specializations - 6. Conversation history and context - - Current State: - {json.dumps(context['current_state'], indent=2)} - - Conversation History: - {context['conversation_history']} - - Available Agents: - {json.dumps(context['available_agents'], indent=2)} - - Return a JSON object with the following structure: - {{ - "selected_agent": "name of the most suitable agent", - "reasoning": "brief explanation of why this agent was selected", - "required_capabilities": ["list", "of", "required", "capabilities"], - "potential_risks": ["list", "of", "potential", "issues"], - "task": {{ - "description": "clear description of what the agent needs to do", - "input_format": {{ - "documents": ["list", "of", "required", "input", "documents"], - "data": ["list", "of", "required", "data", "fields"] - }}, - "output_format": {{ - "documents": ["list", "of", "expected", "output", "documents"], - "data": ["list", "of", "expected", "output", "fields"] - }}, - "requirements": [ - "list of specific requirements", - "format requirements", - "quality requirements" - ], - "constraints": [ - "list of constraints", - "time limits", - "resource limits" - ] - }}, - "prompt_template": "template for the agent's prompt with placeholders for dynamic content" - }} - - Format your response as a valid JSON object. - """ - - # Get AI's analysis and selection - response = await self._callAiComplex(prompt) - - try: - analysis = json.loads(response) - selected_agent_name = analysis.get('selected_agent') - - # Find the selected agent in available agents - selected_agent = next( - (agent for agent in availableAgents if agent.get('name') == selected_agent_name), - None - ) - - if selected_agent: - logger.info(f"AI selected agent {selected_agent_name}: {analysis.get('reasoning')}") - # Update handover with AI's analysis - handover.requiredCapabilities = analysis.get('required_capabilities', []) - handover.analysis = { - 'reasoning': analysis.get('reasoning'), - 'potential_risks': analysis.get('potential_risks', []), - 'task': analysis.get('task', {}), - 'prompt_template': analysis.get('prompt_template', '') - } - return selected_agent - else: - logger.warning(f"AI selected agent {selected_agent_name} not found in available agents") - return None - - except json.JSONDecodeError as e: - logger.error(f"Error parsing AI response: {str(e)}") - return None - - except Exception as e: - logger.error(f"Error selecting next agent: {str(e)}") - return None - - async def processNextAgent(self, handover: AgentHandover) -> AgentHandover: - """ - Process the next agent in the workflow. - - Args: - handover: Current handover object - - Returns: - Updated handover object - """ - try: - # Get agent instance - agent = self.agentManager.getAgent(handover.nextAgent) - if not agent: - handover.update_status("failed", f"Agent {handover.nextAgent} not found") - return handover - - # Set current agent - handover.currentAgent = handover.nextAgent - handover.nextAgent = None - - # Execute agent - response = await agent.execute(handover) - - # Update handover with results - if response.success: - handover.update_status("success") - handover.documentsOutput = response.message.documents if response.message else [] - handover.promptFromFinishedAgent = response.message.message if response.message else "" - else: - handover.update_status("failed", response.error) - - return handover - - except Exception as e: - logger.error(f"Error processing next agent: {str(e)}") - handover.update_status("failed", str(e)) - return handover - - # Agent functions - - async def _callAiBasic(self, prompt: str, context: Dict[str, Any] = None) -> str: - """Call basic AI model.""" - try: - response = await self.service.base.callAi(prompt, context or {}, model="aiBase") - return response - except Exception as e: - logger.error(f"Error calling basic AI: {str(e)}") - return "" - - async def _callAiComplex(self, prompt: str, context: Dict[str, Any] = None) -> str: - """Call complex AI model.""" - try: - response = await self.service.base.callAi(prompt, context or {}, model="aiComplex") - return response - except Exception as e: - logger.error(f"Error calling complex AI: {str(e)}") - return "" - - async def _callAiImage(self, prompt: str, context: Dict[str, Any] = None) -> str: - """Call image AI model.""" - try: - response = await self.service.base.callAi(prompt, context or {}, model="aiImage") - return response - except Exception as e: - logger.error(f"Error calling image AI: {str(e)}") - return "" - - def logAdd(self, message: str, level: str = "info", - progress: Optional[int] = None) -> str: - """ - Add a log entry to the workflow. - - Args: - message: Log message - level: Log level (info, warning, error) - progress: Optional progress percentage - - Returns: - str: ID of the created log entry - """ - workflow = self.service.workflow - try: - # Generate log ID - logId = str(uuid.uuid4()) - - # Create log entry - logEntry = ChatLog( - id=logId, - workflowId=workflow.id, - message=message, - level=level, - progress=progress, - timestamp=datetime.now().isoformat() - ) - - # Add to workflow logs - workflow.logs.append(logEntry) - - # Also log to Python logger - logLevel = getattr(logging, level.upper()) - logger.log(logLevel, f"[Workflow {workflow.id}] {message}") - - # Save to database - self.chatManager.saveWorkflowLog(workflow.id, logEntry.to_dict()) - - return logId - - except Exception as e: - logger.error(f"Error adding log entry: {str(e)}") - return "" - - async def chatMessageToWorkflow(self, role: str, agent: Union[str, Dict[str, Any]], chatMessage: UserInputRequest) -> ChatMessage: - """ - Integrates chat message input into a Message object including files with complete contents. - - Args: - role: Role of the message sender (e.g., 'user', 'assistant') - agent: Agent name or configuration - chatMessage: UserInputRequest object containing message data and file references - - Returns: - ChatMessage object with complete file contents - """ - try: - # Process additional files with complete contents - additionalFileIds = chatMessage.listFileId or [] - additionalFiles = await self.processFileIds(additionalFileIds) - - # Create message object - message = ChatMessage( - id=str(uuid.uuid4()), - workflowId=self.service.workflow.id, - role=role, - agentName=agent if isinstance(agent, str) else agent.get("name", ""), - message=chatMessage.message, - documents=additionalFiles, - status="completed", - startedAt=datetime.now().isoformat() - ) - - return message - - except Exception as e: - logger.error(f"Error creating workflow message: {str(e)}") - raise - - async def sendFinalMessage(self, handover: AgentHandover) -> ChatMessage: - """ - Send final message to user with workflow results. - - Args: - handover: Final handover object - - Returns: - Final message to user - """ - try: - # Create final message content from handover - messageContent = handover.promptFromFinishedAgent - if handover.status == "failed": - messageContent = f"Workflow failed: {handover.error}" - - # Add summary of generated documents - if handover.documentsOutput: - messageContent += "\n\nGenerated documents:" - for doc in handover.documentsOutput: - messageContent += f"\n- {doc.get('name', 'Unknown')}" - - # Create message object - finalMessage = ChatMessage( - id=str(uuid.uuid4()), - workflowId=self.service.workflow.id, - agentName="Workflow Manager", - message=messageContent, - role="assistant", - status="completed", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=handover.status == "success", - documents=handover.documentsOutput - ) - - return finalMessage - - except Exception as e: - logger.error(f"Error sending final message: {str(e)}") - return ChatMessage( - id=str(uuid.uuid4()), - workflowId=self.service.workflow.id, - agentName="Workflow Manager", - message=f"Error in workflow: {str(e)}", - role="system", - status="error", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=False - ) - - async def workflowSummarize(self, messageUser: ChatMessage) -> str: - """ - Creates a summary of the workflow without the current user message. - - Args: - messageUser: Current user message - - Returns: - Summary of the workflow - """ - if not self.service.workflow or "messages" not in self.service.workflow or not self.service.workflow["messages"]: - return "" # First message - - # Go through messages in chronological order - messages = sorted(self.service.workflow["messages"], key=lambda m: m.get("sequenceNo", 0), reverse=False) - - summaryParts = [] - for message in messages: - if message["id"] != messageUser["id"]: - messageSummary = await self.messageSummarize(message) - summaryParts.append(messageSummary) - - return "\n\n".join(summaryParts) - - async def messageSummarize(self, message: ChatMessage) -> str: - """ - Creates a summary of a message including its documents. - - Args: - message: Message to summarize - - Returns: - Summary of the message - """ - role = message.role - agentName = message.agentName - content = message.content - - try: - # Use the serviceBase for language-aware AI calls - prompt = f"Create a very concise summary (2-3 sentences, maximum 300 characters) of the following message:\n\n{content}" - contentSummary = await self._callAiBasic(prompt) - except Exception as e: - logger.error(f"Error creating summary: {str(e)}") - contentSummary = content[:200] + "..." - - # Summarize documents - docsSummary = "" - if "documents" in message and message["documents"]: - docsList = [] - for i, doc in enumerate(message["documents"]): - docName = self.getFilename(doc) - docsList.append(docName) - if docsList: - docsSummary = "\nDocuments:" + "\n- ".join(docsList) - - return f"[{role} {agentName}]: {contentSummary}{docsSummary}" - - def getFilename(self, document: ChatDocument) -> str: - """ - Gets the filename from a document by combining name and extension. - - Args: - document: Document object - - Returns: - Filename with extension - """ - name = document.name - ext = document.ext - if ext: - return f"{name}.{ext}" - return name - - async def _detectUserLanguage(self, text: str) -> str: - """ - Detects the language of user input using AI. - - Args: - text: User input text to analyze - - Returns: - Language code (e.g., 'en', 'de', 'fr') - """ - try: - # Use basic AI model for language detection - prompt = f""" - Analyze the following text and identify its language. - Return only the ISO 639-1 language code (e.g., 'en' for English, 'de' for German). - - Text: {text} - """ - response = await self._callAiBasic(prompt) - # Clean and validate response - lang_code = response.strip().lower() - # Basic validation of common language codes - valid_codes = {'en', 'de', 'fr', 'es', 'it', 'pt', 'nl', 'ru', 'zh', 'ja', 'ko'} - return lang_code if lang_code in valid_codes else 'en' - except Exception as e: - logger.error(f"Error detecting language: {str(e)}") - return 'en' # Default to English on error - - -# Singleton factory for the chat manager -def getChatManager(): - return ChatManager.getInstance() \ No newline at end of file diff --git a/modules/workflow/dataAccessFunctions.py b/modules/workflow/dataAccessFunctions.py deleted file mode 100644 index 87448544..00000000 --- a/modules/workflow/dataAccessFunctions.py +++ /dev/null @@ -1,273 +0,0 @@ -""" -Data access functions for Microsoft and Google services. -Provides standardized interfaces for SharePoint, Outlook, and other services. -""" - -from typing import List, Dict, Any, Optional, Union -from datetime import datetime -from pydantic import BaseModel, Field -from enum import Enum - -class ServiceType(str, Enum): - """Service types for data access""" - MSFT = "msft" - GOOGLE = "google" - -class FileRef(BaseModel): - """Reference to a file in storage""" - id: str - name: str - path: str - url: Optional[str] = None - size: Optional[int] = None - lastModified: Optional[datetime] = None - -# SharePoint Functions -class SharePointSearchParams(BaseModel): - """Parameters for SharePoint search""" - userName: str - query: str - site: Optional[str] = None - folder: Optional[str] = None - contentType: Optional[str] = None - createdAfter: Optional[datetime] = None - modifiedAfter: Optional[datetime] = None - maxResults: Optional[int] = 100 - -class SharePointFolderParams(BaseModel): - """Parameters for SharePoint folder operations""" - userName: str - folderPattern: str - site: Optional[str] = None - recursive: bool = False - includeFiles: bool = True - -class SharePointFileParams(BaseModel): - """Parameters for SharePoint file operations""" - userName: str - fileName: str - site: Optional[str] = None - folder: Optional[str] = None - content: Optional[bytes] = None - contentType: Optional[str] = None - -async def Msft_Sharepoint_Search(params: SharePointSearchParams) -> List[Dict[str, Any]]: - """Search SharePoint for files and folders matching criteria""" - # Implementation would go here - pass - -async def Msft_Sharepoint_GetFolders(params: SharePointFolderParams) -> Dict[str, Any]: - """Get SharePoint folders matching pattern""" - # Implementation would go here - pass - -async def Msft_Sharepoint_GetFiles(params: SharePointFileParams) -> Dict[str, Any]: - """Get SharePoint files matching pattern""" - # Implementation would go here - pass - -async def Msft_Sharepoint_GetFile(params: SharePointFileParams) -> Dict[str, Any]: - """Get specific SharePoint file""" - # Implementation would go here - pass - -async def Msft_Sharepoint_PutFile(params: SharePointFileParams) -> FileRef: - """Upload file to SharePoint""" - # Implementation would go here - pass - -# Outlook Mail Functions -class OutlookMailParams(BaseModel): - """Parameters for Outlook mail operations""" - userName: str - folder: Optional[str] = None - messageId: Optional[str] = None - subject: Optional[str] = None - body: Optional[str] = None - to: Optional[List[str]] = None - cc: Optional[List[str]] = None - bcc: Optional[List[str]] = None - attachments: Optional[List[FileRef]] = None - searchString: Optional[str] = None - fromAddress: Optional[str] = None - receivedAfter: Optional[datetime] = None - maxResults: Optional[int] = 100 - -async def Msft_Outlook_ReadMails(params: OutlookMailParams) -> List[Dict[str, Any]]: - """Read multiple emails from Outlook""" - # Implementation would go here - pass - -async def Msft_Outlook_ReadMail(params: OutlookMailParams) -> Dict[str, Any]: - """Read specific email from Outlook""" - # Implementation would go here - pass - -async def Msft_Outlook_DraftMail(params: OutlookMailParams) -> Dict[str, Any]: - """Create draft email in Outlook""" - # Implementation would go here - pass - -async def Msft_Outlook_SendMail(params: OutlookMailParams) -> Dict[str, Any]: - """Send email through Outlook""" - # Implementation would go here - pass - -# Outlook Calendar Functions -class OutlookCalendarParams(BaseModel): - """Parameters for Outlook calendar operations""" - userName: str - calendar: Optional[str] = None - eventId: Optional[str] = None - subject: Optional[str] = None - body: Optional[str] = None - startTime: Optional[datetime] = None - endTime: Optional[datetime] = None - location: Optional[str] = None - organizer: Optional[str] = None - attendees: Optional[List[str]] = None - searchString: Optional[str] = None - maxResults: Optional[int] = 100 - -async def Msft_Outlook_ReadAppointments(params: OutlookCalendarParams) -> List[Dict[str, Any]]: - """Read multiple calendar appointments""" - # Implementation would go here - pass - -async def Msft_Outlook_CreateAppointment(params: OutlookCalendarParams) -> Dict[str, Any]: - """Create new calendar appointment""" - # Implementation would go here - pass - -async def Msft_Outlook_ReadAppointment(params: OutlookCalendarParams) -> Dict[str, Any]: - """Read specific calendar appointment""" - # Implementation would go here - pass - -async def Msft_Outlook_UpdateAppointment(params: OutlookCalendarParams) -> Dict[str, Any]: - """Update existing calendar appointment""" - # Implementation would go here - pass - -async def Msft_Outlook_DeleteAppointment(params: OutlookCalendarParams) -> bool: - """Delete calendar appointment""" - # Implementation would go here - pass - -def get_data_access_functions() -> List[Dict[str, Any]]: - """ - Dynamically generates a comprehensive list of all available data access functions - with their parameters for use in agent prompts. - """ - import inspect - import sys - - functions = [] - current_module = sys.modules[__name__] - - # Get all functions in the module - for name, obj in inspect.getmembers(current_module): - # Check if it's a function and starts with Msft_ or Google_ - if inspect.isfunction(obj) and (name.startswith('Msft_') or name.startswith('Google_')): - # Get function signature - sig = inspect.signature(obj) - - # Get return type annotation - return_type = obj.__annotations__.get('return', 'Any') - if hasattr(return_type, '__origin__'): - return_type = str(return_type) - - # Get parameter model class - param_model = None - for param in sig.parameters.values(): - if param.annotation.__module__ == __name__: - param_model = param.annotation - break - - # Determine authority from function name - authority = ServiceType.MSFT if name.startswith('Msft_') else ServiceType.GOOGLE - - # Create function entry - function_entry = { - "name": name, - "description": obj.__doc__ or "", - "parameters": param_model.schema() if param_model else {}, - "return_type": str(return_type), - "authority": authority - } - - functions.append(function_entry) - - return functions - -class DataAccess: - """Manages data access functions for different services""" - - def __init__(self): - """Initialize the data access manager""" - self.functions = get_data_access_functions() - self._initialize_functions() - - def _initialize_functions(self): - """Initialize function groups and metadata""" - # Group functions by service type - self.msft_functions = {} - self.google_functions = {} - - for func in self.functions: - func_name = func['name'] - # Get the actual function object - func_obj = globals()[func_name] - - if func['authority'] == ServiceType.MSFT: - self.msft_functions[func_name] = func_obj - else: - self.google_functions[func_name] = func_obj - - @property - def msft(self) -> Dict[str, Any]: - """Get Microsoft service functions and metadata""" - return { - 'functions': self.msft_functions, - 'metadata': { - 'name': 'Microsoft Services', - 'description': 'Microsoft Office 365 and SharePoint services', - 'functions': [f for f in self.functions if f['authority'] == ServiceType.MSFT] - } - } - - @property - def google(self) -> Dict[str, Any]: - """Get Google service functions and metadata""" - return { - 'functions': self.google_functions, - 'metadata': { - 'name': 'Google Services', - 'description': 'Google Workspace services', - 'functions': [f for f in self.functions if f['authority'] == ServiceType.GOOGLE] - } - } - - @property - def utils(self) -> Dict[str, Any]: - """Get utility functions for data access""" - return { - 'getAvailableFunctions': lambda: self.functions, - 'getFunctionInfo': lambda name: next((f for f in self.functions if f['name'] == name), None), - 'getServiceFunctions': lambda service_type: [f for f in self.functions if f['authority'] == service_type] - } - - def to_service_object(self) -> Dict[str, Any]: - """Convert to service object format""" - return { - 'msft': self.msft, - 'google': self.google, - 'utils': self.utils - } - -def get_data_access() -> DataAccess: - """Get a singleton instance of the data access manager""" - if not hasattr(get_data_access, '_instance'): - get_data_access._instance = DataAccess() - return get_data_access._instance - diff --git a/modules/workflow/documentManager.py b/modules/workflow/documentManager.py deleted file mode 100644 index 6e627c3a..00000000 --- a/modules/workflow/documentManager.py +++ /dev/null @@ -1,396 +0,0 @@ -""" -Document Manager Module for handling document operations and content extraction. -""" - -import logging -from typing import Dict, Any, List, Optional -from datetime import datetime -from modules.interfaces.serviceChatModel import ChatDocument, ChatContent -from modules.workflow.documentProcessor import getDocumentContents -import uuid -import json -import base64 - -logger = logging.getLogger(__name__) - -class DocumentManager: - """Manager for document operations and content extraction.""" - - _instance = None - - @classmethod - def getInstance(cls): - """Return a singleton instance of the document manager.""" - if cls._instance is None: - cls._instance = cls() - return cls._instance - - def __init__(self): - """Initialize the document manager.""" - if DocumentManager._instance is not None: - raise RuntimeError("Singleton instance already exists - use getInstance()") - - self.service = None - - def initialize(self, service=None): - """Initialize or update the manager with service references.""" - if service: - # Validate required interfaces - required_interfaces = ['base', 'msft', 'google'] - missing_interfaces = [] - for interface in required_interfaces: - if not hasattr(service, interface): - missing_interfaces.append(interface) - - if missing_interfaces: - logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}") - return False - - self.service = service - return True - - async def extractContent(self, fileId: str) -> Optional[ChatDocument]: - """ - Extract content from a file. - - Args: - fileId: ID of the file to extract content from - - Returns: - ChatDocument object if successful, None otherwise - """ - try: - # Get file content - fileContent = await self.getFileContent(fileId) - if not fileContent: - return None - - # Get file metadata - fileMetadata = await self.getFileMetadata(fileId) - if not fileMetadata: - return None - - # Create ChatDocument - return ChatDocument( - id=str(uuid.uuid4()), - fileId=fileId, - filename=fileMetadata.get("name", "Unknown"), - fileSize=fileMetadata.get("size", 0), - content=fileContent.decode('utf-8', errors='ignore'), - mimeType=fileMetadata.get("mimeType", "text/plain") - ) - except Exception as e: - logger.error(f"Error extracting content from file {fileId}: {str(e)}") - return None - - async def getFileContent(self, fileId: str) -> Optional[bytes]: - """Gets the content of a file.""" - try: - return self.service.functions.getFileData(fileId) - except Exception as e: - logger.error(f"Error getting file content for {fileId}: {str(e)}") - return None - - async def getFileMetadata(self, fileId: str) -> Optional[Dict[str, Any]]: - """Gets the metadata of a file.""" - try: - return self.service.functions.getFile(fileId) - except Exception as e: - logger.error(f"Error getting file metadata for {fileId}: {str(e)}") - return None - - async def saveFile(self, filename: str, content: bytes, mimeType: str) -> Optional[int]: - """ - Save a new file. - - Args: - filename: Name of the file - content: File content as bytes - mimeType: MIME type of the file - - Returns: - File ID if successful, None otherwise - """ - try: - return await self.service.base.saveFile(filename, content, mimeType) - except Exception as e: - logger.error(f"Error saving file {filename}: {str(e)}") - return None - - async def deleteFile(self, fileId: str) -> bool: - """Deletes a file.""" - try: - return self.service.functions.deleteFile(fileId) - except Exception as e: - logger.error(f"Error deleting file {fileId}: {str(e)}") - return False - - async def convertFileRefToId(self, ref: str) -> Optional[int]: - """ - Convert agent file reference to file ID. - - Args: - ref: File reference in format 'filename;id' or just 'id' - - Returns: - File ID if successful, None otherwise - """ - try: - # Extract file ID from reference format - if isinstance(ref, str) and ';' in ref: - return int(ref.split(';')[1]) - return int(ref) - except Exception as e: - logger.error(f"Error converting file reference to ID: {str(e)}") - return None - - async def convertFileIdToRef(self, fileId: str) -> Optional[str]: - """ - Convert file ID to agent file reference. - - Args: - fileId: File ID to convert - - Returns: - File reference in format 'filename;id' if successful, None otherwise - """ - try: - file = await self.getFileMetadata(fileId) - if not file: - return None - return f"{file['name']};{fileId}" - except Exception as e: - logger.error(f"Error converting file ID to reference: {str(e)}") - return None - - async def convertDataFormat(self, data: Any, format: str) -> Any: - """ - Convert data between different formats. - - Args: - data: Data to convert - format: Target format ('json', 'base64', etc.) - - Returns: - Converted data - """ - try: - if format == 'json': - if isinstance(data, str): - return json.loads(data) - return json.dumps(data) - elif format == 'base64': - if isinstance(data, str): - return base64.b64encode(data.encode('utf-8')).decode('utf-8') - return base64.b64encode(data).decode('utf-8') - return data - except Exception as e: - logger.error(f"Error converting data format: {str(e)}") - return data - - async def createAgentInputFileList(self, files: List[str]) -> List[Dict[str, Any]]: - """ - Create a list of input files for agent processing. - - Args: - files: List of file references - - Returns: - List of file objects with content - """ - try: - inputFiles = [] - for file in files: - fileId = await self.convertFileRefToId(file) - if fileId: - fileData = await self.getFileMetadata(fileId) - if fileData: - content = await self.getFileContent(fileId) - inputFiles.append({ - 'id': fileId, - 'name': fileData['name'], - 'mimeType': fileData['mimeType'], - 'content': content - }) - return inputFiles - except Exception as e: - logger.error(f"Error creating agent input file list: {str(e)}") - return [] - - async def saveAgentOutputFiles(self, files: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """ - Save output files from agent processing. - - Args: - files: List of file objects with content - - Returns: - List of saved file metadata - """ - try: - savedFiles = [] - for file in files: - # Create file metadata - fileMeta = await self.saveFile( - filename=file['name'], - content=file['content'], - mimeType=file.get('mimeType', 'application/octet-stream') - ) - - if fileMeta: - savedFiles.append({ - 'id': fileMeta, - 'name': file['name'], - 'mimeType': file.get('mimeType', 'application/octet-stream') - }) - return savedFiles - except Exception as e: - logger.error(f"Error saving agent output files: {str(e)}") - return [] - - async def contentWithPrompt(self, document: Dict[str, Any], prompt: str) -> Optional[Dict[str, Any]]: - """ - Extract content from a document using AI with a specific prompt. - Handles large files by processing in chunks and merging results. - - Args: - document: Document object with file information - prompt: Specific prompt for content extraction - - Returns: - Dictionary with extracted content and metadata - """ - try: - # First get the document content - chat_doc = await self.extractContent(document.get('id')) - if not chat_doc: - return None - - # Prepare the content for AI processing - content = chat_doc.content - mime_type = chat_doc.mimeType - - # For large files, process in chunks - if len(content) > 100000: # Arbitrary threshold, adjust as needed - chunks = self._splitContentIntoChunks(content, mime_type) - extracted_chunks = [] - - for chunk in chunks: - # Process each chunk with AI - chunk_result = await self._processContentChunk(chunk, prompt) - if chunk_result: - extracted_chunks.append(chunk_result) - - # Merge results - return { - "content": self._mergeChunkResults(extracted_chunks), - "metadata": { - "original_size": len(content), - "chunks_processed": len(chunks), - "mime_type": mime_type - } - } - else: - # Process single chunk - result = await self._processContentChunk(content, prompt) - return { - "content": result, - "metadata": { - "original_size": len(content), - "chunks_processed": 1, - "mime_type": mime_type - } - } - - except Exception as e: - logger.error(f"Error in contentWithPrompt: {str(e)}") - return None - - def _splitContentIntoChunks(self, content: str, mime_type: str) -> List[str]: - """ - Split content into manageable chunks based on mime type. - - Args: - content: Content to split - mime_type: MIME type of the content - - Returns: - List of content chunks - """ - try: - if mime_type.startswith('text/'): - # Split text content by paragraphs or sections - return [chunk.strip() for chunk in content.split('\n\n') if chunk.strip()] - elif mime_type == 'application/json': - # Split JSON content by objects - data = json.loads(content) - if isinstance(data, list): - return [json.dumps(item) for item in data] - return [content] - else: - # Default chunking - return [content[i:i+10000] for i in range(0, len(content), 10000)] - except Exception as e: - logger.error(f"Error splitting content: {str(e)}") - return [content] - - async def _processContentChunk(self, chunk: str, prompt: str) -> Optional[str]: - """ - Process a single content chunk with AI. - - Args: - chunk: Content chunk to process - prompt: Extraction prompt - - Returns: - Processed content - """ - try: - # Create AI prompt - ai_prompt = f""" - Extract relevant information from this content based on the following prompt: - - PROMPT: {prompt} - - CONTENT: - {chunk} - - Return ONLY the extracted information in a clear, concise format. - """ - - # Get AI response - response = await self.service.base.callAi([ - {"role": "system", "content": "You are an expert at extracting relevant information from documents."}, - {"role": "user", "content": ai_prompt} - ]) - - return response.strip() - - except Exception as e: - logger.error(f"Error processing content chunk: {str(e)}") - return None - - def _mergeChunkResults(self, chunks: List[str]) -> str: - """ - Merge processed content chunks into a single result. - - Args: - chunks: List of processed chunks - - Returns: - Merged content - """ - try: - # Remove duplicates and empty chunks - chunks = [chunk for chunk in chunks if chunk and chunk.strip()] - - # Merge chunks with appropriate spacing - return "\n\n".join(chunks) - - except Exception as e: - logger.error(f"Error merging chunk results: {str(e)}") - return "" - -# Singleton factory for the document manager -def getDocumentManager(): - return DocumentManager.getInstance() \ No newline at end of file diff --git a/modules/workflow/documentProcessor.py b/modules/workflow/documentProcessor.py deleted file mode 100644 index bd099128..00000000 --- a/modules/workflow/documentProcessor.py +++ /dev/null @@ -1,1008 +0,0 @@ -""" -Module for extracting content from various file formats. -Provides specialized functions for processing text, PDF, Office documents, images, etc. -""" - -import logging -import os -import io -from typing import Dict, Any, List, Optional, Union, Tuple -import base64 -from modules.interfaces.serviceChatModel import ChatContent - -# Configure logger -logger = logging.getLogger(__name__) - -# Optional imports - only loaded when needed -pdfExtractorLoaded = False -officeExtractorLoaded = False -imageProcessorLoaded = False - -class FileProcessingError(Exception): - """Custom exception for file processing errors.""" - pass - -def getDocumentContents(fileMetadata: Dict[str, Any], fileContent: bytes) -> List[ChatContent]: - """ - Main function for extracting content from a file based on its MIME type. - Delegates to specialized extraction functions. - - Args: - fileMetadata: File metadata (Name, MIME type, etc.) - fileContent: Binary data of the file - - Returns: - List of ChatContent objects with metadata and base64Encoded flag - """ - try: - mimeType = fileMetadata.get("mimeType", "application/octet-stream") - fileName = fileMetadata.get("name", "unknown") - - logger.info(f"Extracting content from file '{fileName}' (MIME type: {mimeType})") - - # Extract content based on MIME type - contents = [] - - # Try to detect actual file type from content for unknown MIME types - if mimeType == "application/octet-stream": - # Check file extension first - ext = os.path.splitext(fileName)[1].lower() - if ext: - # Map common extensions to MIME types - ext_to_mime = { - '.txt': 'text/plain', - '.md': 'text/markdown', - '.csv': 'text/csv', - '.json': 'application/json', - '.xml': 'application/xml', - '.js': 'application/javascript', - '.py': 'application/x-python', - '.svg': 'image/svg+xml', - '.jpg': 'image/jpeg', - '.jpeg': 'image/jpeg', - '.png': 'image/png', - '.gif': 'image/gif', - '.pdf': 'application/pdf', - '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', - '.doc': 'application/msword', - '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', - '.xls': 'application/vnd.ms-excel', - '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', - '.ppt': 'application/vnd.ms-powerpoint' - } - if ext in ext_to_mime: - mimeType = ext_to_mime[ext] - logger.info(f"Detected MIME type {mimeType} from extension {ext}") - else: - logger.warning(f"Unknown file extension {ext} for file {fileName}") - - # Try to detect if it's text content - try: - text_content = fileContent.decode('utf-8') - logger.info(f"Successfully decoded file {fileName} as text") - contents.extend(extractTextContent(fileName, fileContent, "text/plain")) - except UnicodeDecodeError: - logger.info(f"File {fileName} is not text, treating as binary") - contents.extend(extractBinaryContent(fileName, fileContent, mimeType)) - - # Text-based formats (excluding CSV which has its own handler) - elif mimeType == "text/csv": - contents.extend(extractCsvContent(fileName, fileContent)) - - # Then handle other text-based formats - elif mimeType.startswith("text/") or mimeType in [ - "application/json", - "application/xml", - "application/javascript", - "application/x-python" - ]: - contents.extend(extractTextContent(fileName, fileContent, mimeType)) - - # SVG Files - elif mimeType == "image/svg+xml": - contents.extend(extractSvgContent(fileName, fileContent)) - - # Images - elif mimeType.startswith("image/"): - contents.extend(extractImageContent(fileName, fileContent, mimeType)) - - # PDF Documents - elif mimeType == "application/pdf": - contents.extend(extractPdfContent(fileName, fileContent)) - - # Word Documents - elif mimeType in [ - "application/vnd.openxmlformats-officedocument.wordprocessingml.document", - "application/msword" - ]: - contents.extend(extractWordContent(fileName, fileContent, mimeType)) - - # Excel Documents - elif mimeType in [ - "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", - "application/vnd.ms-excel" - ]: - contents.extend(extractExcelContent(fileName, fileContent, mimeType)) - - # PowerPoint Documents - elif mimeType in [ - "application/vnd.openxmlformats-officedocument.presentationml.presentation", - "application/vnd.ms-powerpoint" - ]: - contents.extend(extractPowerpointContent(fileName, fileContent, mimeType)) - - # Binary data as fallback for unknown formats - else: - logger.warning(f"Unknown MIME type {mimeType} for file {fileName}, treating as binary") - contents.extend(extractBinaryContent(fileName, fileContent, mimeType)) - - # Fallback when no content could be extracted - if not contents: - logger.warning(f"No content extracted from file '{fileName}', using binary fallback") - - # Convert binary content to base64 - encoded_data = base64.b64encode(fileContent).decode('utf-8') - - contents.append(ChatContent( - sequenceNr=1, - name='1_undefined', - mimeType=mimeType, - data=encoded_data, - metadata={ - "isText": False, - "base64Encoded": True - } - )) - - # Add generic attributes for all documents - for content in contents: - # Make sure all content items have the base64Encoded flag - if not hasattr(content, "base64Encoded"): - if isinstance(content.data, bytes): - # Convert bytes to base64 - content.data = base64.b64encode(content.data).decode('utf-8') - content.base64Encoded = True - else: - # Assume text content if not explicitly marked - content.base64Encoded = False - - # Maintain backward compatibility with old "base64Encoded" flag in metadata - if not content.metadata: - content.metadata = {} - - # Set base64Encoded in metadata for backward compatibility - content.metadata["base64Encoded"] = content.base64Encoded - - logger.info(f"Successfully extracted {len(contents)} content items from file '{fileName}'") - return contents - - except Exception as e: - logger.error(f"Error during content extraction for file {fileMetadata.get('name', 'unknown')}: {str(e)}", exc_info=True) - # Fallback on error - return original data - return [ChatContent( - sequenceNr=1, - name=fileMetadata.get("name", "unknown"), - mimeType=fileMetadata.get("mimeType", "application/octet-stream"), - data=base64.b64encode(fileContent).decode('utf-8'), - metadata={ - "isText": False, - "base64Encoded": True - } - )] - - -def _loadPdfExtractor(): - """Loads PDF extraction libraries when needed""" - global pdfExtractorLoaded - if not pdfExtractorLoaded: - try: - global PyPDF2, fitz - import PyPDF2 - import fitz # PyMuPDF for more extensive PDF processing - pdfExtractorLoaded = True - logger.info("PDF extraction libraries successfully loaded") - except ImportError as e: - logger.warning(f"PDF extraction libraries could not be loaded: {e}") - -def _loadOfficeExtractor(): - """Loads Office document extraction libraries when needed""" - global officeExtractorLoaded - if not officeExtractorLoaded: - try: - global docx, openpyxl - import docx # python-docx for Word documents - import openpyxl # for Excel files - officeExtractorLoaded = True - logger.info("Office extraction libraries successfully loaded") - except ImportError as e: - logger.warning(f"Office extraction libraries could not be loaded: {e}") - -def _loadImageProcessor(): - """Loads image processing libraries when needed""" - global imageProcessorLoaded - if not imageProcessorLoaded: - try: - global PIL, Image - from PIL import Image - imageProcessorLoaded = True - logger.info("Image processing libraries successfully loaded") - except ImportError as e: - logger.warning(f"Image processing libraries could not be loaded: {e}") - -def extractTextContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]: - """ - Extracts text from text files. - - Args: - fileName: Name of the file - fileContent: Binary data of the file - mimeType: MIME type of the file - - Returns: - List of Text-Content objects with base64Encoded = False - """ - try: - # Keep original file extension - fileExtension = os.path.splitext(fileName)[1][1:] if os.path.splitext(fileName)[1] else "txt" - - # Extract text content - textContent = fileContent.decode('utf-8') - return [{ - "sequenceNr": 1, - "name": "1_text", # Simplified naming - "ext": fileExtension, - "mimeType": "text/plain", - "data": textContent, - "base64Encoded": False, - "metadata": { - "isText": True - } - }] - except UnicodeDecodeError: - logger.warning(f"Could not decode text from file '{fileName}' as UTF-8, trying alternative encodings") - try: - # Try alternative encodings - for encoding in ['latin-1', 'cp1252', 'iso-8859-1']: - try: - textContent = fileContent.decode(encoding) - logger.info(f"Text successfully decoded with encoding {encoding}") - return [{ - "sequenceNr": 1, - "name": "1_text", # Simplified naming - "ext": fileExtension, - "mimeType": "text/plain", - "data": textContent, - "base64Encoded": False, - "metadata": { - "isText": True, - "encoding": encoding - } - }] - except UnicodeDecodeError: - continue - - # Fallback to binary data if no encoding works - logger.warning(f"Could not decode text, using binary data") - return [{ - "sequenceNr": 1, - "name": "1_binary", # Simplified naming - "ext": fileExtension, - "mimeType": mimeType, - "data": base64.b64encode(fileContent).decode('utf-8'), - "base64Encoded": True, - "metadata": { - "isText": False - } - }] - except Exception as e: - logger.error(f"Error in alternative text decoding: {str(e)}") - # Return binary data as fallback - return [{ - "sequenceNr": 1, - "name": "1_binary", # Simplified naming - "ext": fileExtension, - "mimeType": mimeType, - "data": base64.b64encode(fileContent).decode('utf-8'), - "base64Encoded": True, - "metadata": { - "isText": False - } - }] - -def extractCsvContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]: - """ - Extracts content from CSV files. - - Args: - fileName: Name of the file - fileContent: Binary data of the file - - Returns: - List of CSV-Content objects with base64Encoded = False - """ - try: - # Extract text content - csvContent = fileContent.decode('utf-8') - return [{ - "sequenceNr": 1, - "name": "1_csv", # Simplified naming - "ext": "csv", - "mimeType": "text/csv", - "data": csvContent, - "base64Encoded": False, - "metadata": { - "isText": True, - "format": "csv" - } - }] - except UnicodeDecodeError: - logger.warning(f"Could not decode CSV from file '{fileName}' as UTF-8, trying alternative encodings") - try: - # Try alternative encodings for CSV - for encoding in ['latin-1', 'cp1252', 'iso-8859-1']: - try: - csvContent = fileContent.decode(encoding) - logger.info(f"CSV successfully decoded with encoding {encoding}") - return [{ - "sequenceNr": 1, - "name": "1_csv", # Simplified naming - "ext": "csv", - "mimeType": "text/csv", - "data": csvContent, - "base64Encoded": False, - "metadata": { - "isText": True, - "encoding": encoding, - "format": "csv" - } - }] - except UnicodeDecodeError: - continue - - # Fallback to binary data - return [{ - "sequenceNr": 1, - "name": "1_binary", # Simplified naming - "ext": "csv", - "mimeType": "text/csv", - "data": base64.b64encode(fileContent).decode('utf-8'), - "base64Encoded": True, - "metadata": { - "isText": False - } - }] - except Exception as e: - logger.error(f"Error in alternative CSV decoding: {str(e)}") - return [{ - "sequenceNr": 1, - "name": "1_binary", # Simplified naming - "ext": "csv", - "mimeType": "text/csv", - "data": base64.b64encode(fileContent).decode('utf-8'), - "base64Encoded": True, - "metadata": { - "isText": False - } - }] - -def extractSvgContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]: - """ - Extracts content from SVG files. - - Args: - fileName: Name of the file - fileContent: Binary data of the file - - Returns: - List of SVG-Content objects with dual text/image metadata - """ - contents = [] - - try: - # Extract SVG as text content (XML) - svgText = fileContent.decode('utf-8') - - # Check if it's actually SVG by looking for the SVG tag - if "