diff --git a/modules/agents/z_agentAnalyst copy.py b/modules/agents/z_agentAnalyst copy.py
deleted file mode 100644
index c2d392f4..00000000
--- a/modules/agents/z_agentAnalyst copy.py
+++ /dev/null
@@ -1,902 +0,0 @@
-"""
-Data analyst agent for analysis and interpretation of data.
-Focuses on output-first design with AI-powered analysis.
-"""
-
-import logging
-import json
-import io
-import base64
-import os
-import time
-from typing import Dict, Any, List, Optional
-import pandas as pd
-import matplotlib.pyplot as plt
-import seaborn as sns
-from datetime import datetime
-import hashlib
-import uuid
-import re
-import shutil
-from pathlib import Path
-import traceback
-import sys
-import importlib.util
-import inspect
-from pydantic import BaseModel
-
-from modules.workflow.agentBase import AgentBase
-from modules.interfaces.serviceChatModel import ChatContent
-
-logger = logging.getLogger(__name__)
-
-class AgentAnalyst(AgentBase):
- """AI-driven agent for data analysis and visualization"""
-
- def __init__(self):
- """Initialize the data analysis agent"""
- super().__init__()
- self.name = "analyst"
- self.label = "Data Analysis"
- self.description = "Analyzes data using AI-powered insights and visualizations, produce diagrams and visualizations"
- self.capabilities = [
- "dataAnalysis",
- "statistics",
- "visualization",
- "dataInterpretation",
- "reportGeneration"
- ]
-
- # Set default visualization settings
- plt.style.use('seaborn-v0_8-whitegrid')
-
- def setDependencies(self, serviceBase=None):
- """Set external dependencies for the agent."""
- self.setService(serviceBase)
-
- async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]:
- """
- Process a task by focusing on required outputs and using AI to guide the analysis process.
-
- Args:
- task: Task dictionary with prompt, inputDocuments, outputSpecifications
-
- Returns:
- Dictionary with feedback and documents
- """
- try:
- # Extract task information
- prompt = task.get("prompt", "")
- outputSpecs = task.get("outputSpecifications", [])
- workflow = task.get("context", {}).get("workflow", {})
-
- # Check AI service
- if not self.service or not self.service.base:
- return {
- "feedback": "The Analyst agent requires an AI service to function effectively.",
- "documents": []
- }
-
- # Create analysis plan
- if workflow:
- self.service.logAdd(workflow, "Extracting data from documents...", level="info", progress=35)
- analysisPlan = await self._createAnalysisPlan(prompt)
-
- # Check if this is truly an analysis task
- if not analysisPlan.get("requiresAnalysis", True):
- return {
- "feedback": "This task doesn't appear to require analysis. Please try a different agent.",
- "documents": []
- }
-
- # Analyze data
- if workflow:
- self.service.logAdd(workflow, "Analyzing task requirements...", level="info", progress=45)
- analysisResults = await self._analyzeData(task, analysisPlan)
-
- # Format results into requested output documents
- totalSpecs = len(outputSpecs)
- for i, spec in enumerate(outputSpecs):
- progress = 50 + int((i / totalSpecs) * 40) # Progress from 50% to 90%
- self.service.logAdd(workflow, f"Creating output {i+1}/{totalSpecs}...", level="info", progress=progress)
-
- documents = await self._createOutputDocuments(
- prompt,
- analysisResults,
- outputSpecs,
- analysisPlan
- )
-
- # Generate feedback
- feedback = analysisPlan.get("feedback", f"I analyzed '{prompt[:50]}...' and generated {len(documents)} output documents.")
-
- return {
- "feedback": feedback,
- "documents": documents
- }
-
- except Exception as e:
- logger.error(f"Error during analysis: {str(e)}", exc_info=True)
- return {
- "feedback": f"Error during analysis: {str(e)}",
- "documents": []
- }
-
- def _extractData(self, documents: List[Dict[str, Any]]) -> tuple:
- """
- Extract data from documents, focusing on dataExtracted fields.
-
- Args:
- documents: List of input documents
-
- Returns:
- Tuple of (datasets dictionary, document context text)
- """
- datasets = {}
- documentContext = ""
-
- # Process each document
- for doc in documents:
- docName = doc.get("name", "unnamed")
- if doc.get("ext"):
- docName = f"{docName}.{doc.get('ext')}"
-
- documentContext += f"\n\n--- {docName} ---\n"
-
- # Process contents
- for content in doc.get("contents", []):
- # Focus only on dataExtracted
- if content.get("dataExtracted"):
- extractedText = content.get("dataExtracted", "")
- documentContext += extractedText
-
- # Try to parse as structured data if appropriate
- if docName.lower().endswith(('.csv', '.tsv')):
- try:
- df = pd.read_csv(io.StringIO(extractedText))
- datasets[docName] = df
- except:
- pass
- elif docName.lower().endswith('.json'):
- try:
- jsonData = json.loads(extractedText)
- if isinstance(jsonData, list):
- df = pd.DataFrame(jsonData)
- datasets[docName] = df
- elif isinstance(jsonData, dict):
- # Handle nested JSON structures
- if any(isinstance(v, list) for v in jsonData.values()):
- for key, value in jsonData.items():
- if isinstance(value, list) and len(value) > 0:
- df = pd.DataFrame(value)
- datasets[f"{docName}:{key}"] = df
- else:
- df = pd.DataFrame([jsonData])
- datasets[docName] = df
- except:
- pass
-
- # Try to detect tabular data in text content
- if docName not in datasets and len(extractedText.splitlines()) > 2:
- lines = extractedText.splitlines()
- if any(',' in line for line in lines[:5]):
- try:
- df = pd.read_csv(io.StringIO(extractedText))
- if len(df.columns) > 1:
- datasets[docName] = df
- except:
- pass
- elif any('\t' in line for line in lines[:5]):
- try:
- df = pd.read_csv(io.StringIO(extractedText), sep='\t')
- if len(df.columns) > 1:
- datasets[docName] = df
- except:
- pass
-
- return datasets, documentContext
-
- async def _analyzeTask(self, prompt: str, documentContext: str, datasets: Dict[str, Any], outputSpecs: List[Dict[str, Any]]) -> Dict[str, Any]:
- """
- Analyze the task requirements using AI.
-
- Args:
- prompt: The task prompt
- documentContext: Context from input documents
- datasets: Available datasets
- outputSpecs: Output specifications
-
- Returns:
- Analysis plan dictionary
- """
- # Create analysis prompt
- analysisPrompt = f"""
- Analyze this data analysis task and create a detailed plan:
-
- TASK: {prompt}
-
- DOCUMENT CONTEXT:
- {documentContext}
-
- AVAILABLE DATASETS:
- {json.dumps(datasets, indent=2)}
-
- REQUIRED OUTPUTS:
- {json.dumps(outputSpecs, indent=2)}
-
- Create a detailed analysis plan in JSON format with:
- {{
- "analysisSteps": [
- {{
- "step": "step description",
- "purpose": "why this step is needed",
- "datasets": ["dataset1", "dataset2"],
- "techniques": ["technique1", "technique2"],
- "outputs": ["output1", "output2"]
- }}
- ],
- "visualizations": [
- {{
- "type": "visualization type",
- "purpose": "what it shows",
- "datasets": ["dataset1"],
- "settings": {{"key": "value"}}
- }}
- ],
- "insights": [
- {{
- "type": "insight type",
- "description": "what to look for",
- "datasets": ["dataset1"]
- }}
- ],
- "feedback": "explanation of the analysis approach"
- }}
-
- Respond with ONLY the JSON object, no additional text or explanations.
- """
-
- try:
- # Get analysis plan from AI
- response = await self.service.base.callAi([
- {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."},
- {"role": "user", "content": analysisPrompt}
- ], produceUserAnswer=True)
-
- # Extract JSON
- jsonStart = response.find('{')
- jsonEnd = response.rfind('}') + 1
-
- if jsonStart >= 0 and jsonEnd > jsonStart:
- plan = json.loads(response[jsonStart:jsonEnd])
- return plan
- else:
- # Fallback plan
- logger.warning(f"Not able creating analysis plan, generating fallback plan")
- return {
- "analysisSteps": [
- {
- "step": "Basic data analysis",
- "purpose": "Understand the data structure and content",
- "datasets": list(datasets.keys()),
- "techniques": ["summary statistics", "data visualization"],
- "outputs": ["summary report", "basic visualizations"]
- }
- ],
- "visualizations": [
- {
- "type": "basic charts",
- "purpose": "Show data distribution and relationships",
- "datasets": list(datasets.keys()),
- "settings": {}
- }
- ],
- "insights": [
- {
- "type": "basic insights",
- "description": "Key findings from the data",
- "datasets": list(datasets.keys())
- }
- ],
- "feedback": f"I'll analyze the data and provide insights about {prompt}"
- }
-
- except Exception as e:
- logger.warning(f"Error creating analysis plan: {str(e)}")
- # Simple fallback plan
- return {
- "analysisSteps": [
- {
- "step": "Basic data analysis",
- "purpose": "Understand the data structure and content",
- "datasets": list(datasets.keys()),
- "techniques": ["summary statistics", "data visualization"],
- "outputs": ["summary report", "basic visualizations"]
- }
- ],
- "visualizations": [
- {
- "type": "basic charts",
- "purpose": "Show data distribution and relationships",
- "datasets": list(datasets.keys()),
- "settings": {}
- }
- ],
- "insights": [
- {
- "type": "basic insights",
- "description": "Key findings from the data",
- "datasets": list(datasets.keys())
- }
- ],
- "feedback": f"I'll analyze the data and provide insights about {prompt}"
- }
-
- async def _createAnalysisPlan(self, prompt: str) -> Dict[str, Any]:
- """
- Create an analysis plan based on the task prompt.
-
- Args:
- prompt: The task prompt
-
- Returns:
- Analysis plan dictionary
- """
- try:
- # Create analysis prompt
- analysisPrompt = f"""
- Analyze this data analysis task and create a detailed plan:
-
- TASK: {prompt}
-
- Create a detailed analysis plan in JSON format with:
- {{
- "requiresAnalysis": true/false,
- "analysisSteps": [
- {{
- "step": "step description",
- "purpose": "why this step is needed",
- "techniques": ["technique1", "technique2"],
- "outputs": ["output1", "output2"]
- }}
- ],
- "visualizations": [
- {{
- "type": "visualization type",
- "purpose": "what it shows",
- "settings": {{"key": "value"}}
- }}
- ],
- "insights": [
- {{
- "type": "insight type",
- "description": "what to look for"
- }}
- ],
- "feedback": "explanation of the analysis approach"
- }}
-
- Respond with ONLY the JSON object, no additional text or explanations.
- """
-
- # Get analysis plan from AI
- response = await self.service.base.callAi([
- {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."},
- {"role": "user", "content": analysisPrompt}
- ], produceUserAnswer=True)
-
- # Extract JSON
- jsonStart = response.find('{')
- jsonEnd = response.rfind('}') + 1
-
- if jsonStart >= 0 and jsonEnd > jsonStart:
- plan = json.loads(response[jsonStart:jsonEnd])
- return plan
- else:
- # Fallback plan
- logger.warning(f"Not able creating analysis plan, generating fallback plan")
- return {
- "requiresAnalysis": True,
- "analysisSteps": [
- {
- "step": "Basic data analysis",
- "purpose": "Understand the data structure and content",
- "techniques": ["summary statistics", "data visualization"],
- "outputs": ["summary report", "basic visualizations"]
- }
- ],
- "visualizations": [
- {
- "type": "basic charts",
- "purpose": "Show data distribution and relationships",
- "settings": {}
- }
- ],
- "insights": [
- {
- "type": "basic insights",
- "description": "Key findings from the data"
- }
- ],
- "feedback": f"I'll analyze the data and provide insights about {prompt}"
- }
-
- except Exception as e:
- logger.warning(f"Error creating analysis plan: {str(e)}")
- # Simple fallback plan
- return {
- "requiresAnalysis": True,
- "analysisSteps": [
- {
- "step": "Basic data analysis",
- "purpose": "Understand the data structure and content",
- "techniques": ["summary statistics", "data visualization"],
- "outputs": ["summary report", "basic visualizations"]
- }
- ],
- "visualizations": [
- {
- "type": "basic charts",
- "purpose": "Show data distribution and relationships",
- "settings": {}
- }
- ],
- "insights": [
- {
- "type": "basic insights",
- "description": "Key findings from the data"
- }
- ],
- "feedback": f"I'll analyze the data and provide insights about {prompt}"
- }
-
- async def _createVisualization(self, datasets: Dict, prompt: str, outputLabel: str,
- analysisPlan: Dict, description: str) -> Dict:
- """
- Create a visualization based on the analysis plan.
-
- Args:
- datasets: Dictionary of datasets
- prompt: Original task prompt
- outputLabel: Output file label
- analysisPlan: Analysis plan
- description: Output description
-
- Returns:
- Document dictionary with visualization
- """
- try:
- # Get visualization recommendations
- vizRecommendations = analysisPlan.get("visualizations", [])
-
- if not vizRecommendations:
- # Generate visualization recommendations if none provided
- self.service.base.logAdd(analysisPlan.get("workflowId"), "Generating visualization recommendations...", level="info", progress=50)
- vizPrompt = f"""
- Based on this data and task, recommend appropriate visualizations.
-
- TASK: {prompt}
- DESCRIPTION: {description}
-
- DATASETS:
- {json.dumps({name: {"shape": df.shape, "columns": df.columns.tolist()}
- for name, df in datasets.items()}, indent=2)}
-
- Recommend visualizations in JSON format:
- {{
- "visualizations": [
- {{
- "type": "chart_type",
- "dataSource": "dataset_name",
- "variables": ["col1", "col2"],
- "purpose": "explanation"
- }}
- ]
- }}
- """
-
- response = await self.service.base.callAi([
- {"role": "system", "content": "You are a data visualization expert. Recommend appropriate visualizations based on the data and task."},
- {"role": "user", "content": vizPrompt}
- ])
-
- # Extract JSON
- jsonStart = response.find('{')
- jsonEnd = response.rfind('}') + 1
-
- if jsonStart >= 0 and jsonEnd > jsonStart:
- vizData = json.loads(response[jsonStart:jsonEnd])
- vizRecommendations = vizData.get("visualizations", [])
-
- # Determine format from filename
- formatType = outputLabel.split('.')[-1].lower()
- if formatType not in ['png', 'jpg', 'jpeg', 'svg']:
- formatType = 'png'
-
- # If no datasets available, create error message image
- if not datasets:
- plt.figure(figsize=(10, 6))
- plt.text(0.5, 0.5, "No data available for visualization",
- ha='center', va='center', fontsize=14)
- plt.tight_layout()
- imgData = self._getImageBase64(formatType)
- plt.close()
-
- return {
- "label": outputLabel,
- "content": imgData,
- "metadata": {
- "contentType": f"image/{formatType}"
- }
- }
-
- # Prepare dataset info for the first dataset if none specified
- if not vizRecommendations and datasets:
- name, df = next(iter(datasets.items()))
- vizRecommendations = [{
- "type": "auto",
- "dataSource": name,
- "variables": df.columns.tolist()[:5],
- "purpose": "general analysis"
- }]
-
- # Create visualization code prompt
- vizPrompt = f"""
- Generate Python matplotlib/seaborn code to create a visualization for:
-
- TASK: {prompt}
-
- VISUALIZATION REQUIREMENTS:
- - Output format: {formatType}
- - Filename: {outputLabel}
- - Description: {description}
-
- RECOMMENDED VISUALIZATION:
- {json.dumps(vizRecommendations, indent=2)}
-
- AVAILABLE DATASETS:
- """
-
- # Add dataset info for recommended sources
- for viz in vizRecommendations:
- dataSource = viz.get("dataSource")
- if dataSource in datasets:
- df = datasets[dataSource]
- vizPrompt += f"\nDataset '{dataSource}':\n"
- vizPrompt += f"- Shape: {df.shape}\n"
- vizPrompt += f"- Columns: {df.columns.tolist()}\n"
- vizPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n"
-
- vizPrompt += """
- Generate ONLY Python code that:
- 1. Uses matplotlib and/or seaborn to create a clear visualization
- 2. Sets figure size to (10, 6)
- 3. Includes appropriate titles, labels, and legend
- 4. Uses professional color schemes
- 5. Handles any missing data gracefully
-
- Return ONLY executable Python code, no explanations or markdown.
- """
-
- try:
- # Get visualization code from AI
- vizCode = await self.service.base.callAi([
- {"role": "system", "content": "You are a data visualization expert. Provide only executable Python code."},
- {"role": "user", "content": vizPrompt}
- ], produceUserAnswer = True)
-
- # Clean code
- vizCode = vizCode.replace("```python", "").replace("```", "").strip()
-
- # Execute visualization code
- plt.figure(figsize=(10, 6))
-
- # Make local variables available to the code
- localVars = {
- "plt": plt,
- "sns": sns,
- "pd": pd,
- "np": __import__('numpy')
- }
-
- # Add datasets to local variables
- for name, df in datasets.items():
- # Create a sanitized variable name
- varName = ''.join(c if c.isalnum() else '_' for c in name)
- localVars[varName] = df
-
- # Also add with standard names for simpler code
- if "df" not in localVars:
- localVars["df"] = df
- elif "df2" not in localVars:
- localVars["df2"] = df
-
- # Execute the visualization code
- exec(vizCode, globals(), localVars)
-
- # Capture the image
- imgData = self._getImageBase64(formatType)
- plt.close()
-
- return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
-
- except Exception as e:
- logger.error(f"Error creating visualization: {str(e)}", exc_info=True)
-
- # Create error message image
- plt.figure(figsize=(10, 6))
- plt.text(0.5, 0.5, f"Visualization error: {str(e)}",
- ha='center', va='center', fontsize=12)
- plt.tight_layout()
- imgData = self._getImageBase64(formatType)
- plt.close()
-
- return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
-
- except Exception as e:
- logger.error(f"Error creating visualization: {str(e)}", exc_info=True)
-
- # Create error message image
- plt.figure(figsize=(10, 6))
- plt.text(0.5, 0.5, f"Visualization error: {str(e)}",
- ha='center', va='center', fontsize=12)
- plt.tight_layout()
- imgData = self._getImageBase64(formatType)
- plt.close()
-
- return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
-
- async def _createDataDocument(self, datasets: Dict, prompt: str, outputLabel: str,
- analysisPlan: Dict, description: str) -> ChatContent:
- """
- Create a data document (CSV, JSON, Excel) from analysis results.
-
- Args:
- datasets: Dictionary of datasets
- prompt: Original task prompt
- outputLabel: Output filename
- analysisPlan: Analysis plan
- description: Output description
-
- Returns:
- ChatContent object
- """
- try:
- # Determine format from filename
- formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "csv"
-
- # Process data based on format
- if formatType == "csv":
- result = self._convertToCsv(datasets)
- elif formatType == "json":
- result = json.dumps(datasets, indent=2)
- elif formatType == "xlsx":
- result = self._convertToExcel(datasets)
- else:
- result = str(datasets)
-
- # Determine content type
- contentType = "text/csv" if formatType == "csv" else \
- "application/json" if formatType == "json" else \
- "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" if formatType == "xlsx" else \
- "text/plain"
-
- return self.formatAgentDocumentOutput(outputLabel, result, contentType)
-
- except Exception as e:
- logger.error(f"Error creating data document: {str(e)}", exc_info=True)
-
- errorContent = f"Error generating {formatType} document: {str(e)}"
- return self.formatAgentDocumentOutput(outputLabel, errorContent, "text/plain")
-
- async def _createTextDocument(self, datasets: Dict, context: str, prompt: str,
- outputLabel: str, formatType: str,
- analysisPlan: Dict, description: str) -> ChatContent:
- """
- Create a text document (markdown, HTML, text) from analysis results.
-
- Args:
- datasets: Dictionary of datasets
- context: Document context
- prompt: Original task prompt
- outputLabel: Output filename
- formatType: Output format
- analysisPlan: Analysis plan
- description: Output description
-
- Returns:
- ChatContent object
- """
- try:
- # Generate dataset summaries
- datasetSummaries = []
- for name, df in datasets.items():
- summary = f"\nDataset: {name}\n"
- summary += f"Shape: {df.shape}\n"
- summary += f"Columns: {', '.join(df.columns)}\n"
- if not df.empty:
- summary += f"Sample data:\n{df.head(3).to_string()}\n"
- datasetSummaries.append(summary)
-
- # Generate analysis prompt
- analysisPrompt = f"""
- Create a detailed {formatType} document for:
-
- TASK: {prompt}
-
- OUTPUT REQUIREMENTS:
- - Format: {formatType}
- - Filename: {outputLabel}
- - Description: {description}
-
- ANALYSIS CONTEXT:
- {json.dumps(analysisPlan, indent=2)}
-
- DATASET SUMMARIES:
- {"".join(datasetSummaries)}
-
- DOCUMENT CONTEXT:
- {context[:2000]}... (truncated)
-
- Create a comprehensive, professional analysis document that addresses the task requirements.
- The document should:
- 1. Have a clear structure with headings and sections
- 2. Include relevant data findings and insights
- 3. Provide appropriate interpretations and recommendations
- 4. Format the content according to the required output format
-
- Your response should be the complete document content in the specified format.
- """
-
- # Get document content from AI
- documentContent = await self.service.base.callAi([
- {"role": "system", "content": f"You are a data analysis expert creating a {formatType} document."},
- {"role": "user", "content": analysisPrompt}
- ], produceUserAnswer = True)
-
- # Clean HTML or Markdown if needed
- if formatType in ["md", "markdown"] and not documentContent.strip().startswith("#"):
- documentContent = f"# Analysis Report\n\n{documentContent}"
- elif formatType == "html" and not "
{documentContent}"
-
- # Determine content type
- contentType = "text/markdown" if formatType in ["md", "markdown"] else \
- "text/html" if formatType == "html" else \
- "text/plain"
-
- return self.formatAgentDocumentOutput(outputLabel, documentContent, contentType)
-
- except Exception as e:
- logger.error(f"Error creating text document: {str(e)}", exc_info=True)
-
- # Create a simple error document
- if formatType in ["md", "markdown"]:
- content = f"# Error in Analysis\n\nThere was an error generating the analysis: {str(e)}"
- elif formatType == "html":
- content = f"Error in Analysis
There was an error generating the analysis: {str(e)}
"
- else:
- content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}"
-
- return self.formatAgentDocumentOutput(outputLabel, content, contentType)
-
- def _getImageBase64(self, formatType: str = 'png') -> str:
- """
- Convert current matplotlib figure to base64 string.
-
- Args:
- formatType: Image format
-
- Returns:
- Base64 encoded string of the image
- """
- buffer = io.BytesIO()
- plt.savefig(buffer, format=formatType, dpi=100)
- buffer.seek(0)
- imageData = buffer.getvalue()
- buffer.close()
-
- # Convert to base64
- return base64.b64encode(imageData).decode('utf-8')
-
- async def _analyzeData(self, task: Dict[str, Any], analysisPlan: Dict[str, Any]) -> Dict[str, Any]:
- """
- Analyze data based on the analysis plan.
-
- Args:
- task: Task dictionary with input documents and specifications
- analysisPlan: Analysis plan from _createAnalysisPlan
-
- Returns:
- Analysis results dictionary
- """
- try:
- # Extract data from input documents
- inputDocuments = task.get("inputDocuments", [])
- datasets, documentContext = self._extractData(inputDocuments)
-
- # Get task information
- prompt = task.get("prompt", "")
- outputSpecs = task.get("outputSpecifications", [])
-
- # Analyze task requirements
- analysisResults = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs)
-
- # Add datasets and context to results
- analysisResults["datasets"] = datasets
- analysisResults["documentContext"] = documentContext
-
- return analysisResults
-
- except Exception as e:
- logger.error(f"Error analyzing data: {str(e)}", exc_info=True)
- return {
- "error": str(e),
- "datasets": {},
- "documentContext": ""
- }
-
- async def _createOutputDocuments(self, prompt: str, analysisResults: Dict[str, Any],
- outputSpecs: List[Dict[str, Any]], analysisPlan: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Create output documents based on analysis results.
-
- Args:
- prompt: Original task prompt
- analysisResults: Results from data analysis
- outputSpecs: List of output specifications
- analysisPlan: Analysis plan from _createAnalysisPlan
-
- Returns:
- List of document objects
- """
- documents = []
- datasets = analysisResults.get("datasets", {})
- documentContext = analysisResults.get("documentContext", "")
-
- # Process each output specification
- for spec in outputSpecs:
- outputLabel = spec.get("label", "")
- outputDescription = spec.get("description", "")
-
- # Determine format from filename
- formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt"
-
- try:
- # Create appropriate document based on format
- if formatType in ["png", "jpg", "jpeg", "svg"]:
- # Visualization output
- document = await self._createVisualization(
- datasets, prompt, outputLabel, analysisPlan, outputDescription
- )
- elif formatType in ["csv", "json", "xlsx"]:
- # Data document output
- document = await self._createDataDocument(
- datasets, prompt, outputLabel, analysisPlan, outputDescription
- )
- else:
- # Text document output (markdown, html, text)
- document = await self._createTextDocument(
- datasets, documentContext, prompt, outputLabel, formatType,
- analysisPlan, outputDescription
- )
-
- documents.append(document)
-
- except Exception as e:
- logger.error(f"Error creating output document {outputLabel}: {str(e)}", exc_info=True)
- # Create error document
- errorDoc = self.formatAgentDocumentOutput(
- outputLabel,
- f"Error creating document: {str(e)}",
- "text/plain"
- )
- documents.append(errorDoc)
-
- return documents
-
-
-# Factory function for the Analyst agent
-def getAgentAnalyst():
- """Returns an instance of the Analyst agent."""
- return AgentAnalyst()
\ No newline at end of file
diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py
index db01cd62..a64bb7cd 100644
--- a/modules/interfaces/serviceChatModel.py
+++ b/modules/interfaces/serviceChatModel.py
@@ -3,12 +3,31 @@ Chat model classes for the chat system.
"""
from pydantic import BaseModel, Field
-from typing import List, Dict, Any, Optional
+from typing import List, Dict, Any, Optional, Union
from datetime import datetime
import uuid
from modules.shared.attributeUtils import register_model_labels, ModelMixin
+
+# USER MODELS
+
+class UserInputRequest(BaseModel, ModelMixin):
+ """Data model for a user input request"""
+ prompt: str = Field(description="Prompt for the user")
+ listFileId: List[str] = Field(default_factory=list, description="List of file IDs")
+ userLanguage: str = Field(default="en", description="User's preferred language")
+# Register labels for UserInputRequest
+register_model_labels(
+ "UserInputRequest",
+ {"en": "User Input Request", "fr": "Demande de saisie utilisateur"},
+ {
+ "prompt": {"en": "Prompt", "fr": "Invite"},
+ "listFileId": {"en": "File IDs", "fr": "IDs des fichiers"},
+ "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"}
+ }
+)
+
# WORKFLOW MODELS
class ChatContent(BaseModel, ModelMixin):
@@ -18,7 +37,6 @@ class ChatContent(BaseModel, ModelMixin):
data: str = Field(description="The actual content data")
mimeType: str = Field(description="MIME type of the content")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
-
# Register labels for ChatContent
register_model_labels(
"ChatContent",
@@ -40,7 +58,6 @@ class ChatDocument(BaseModel, ModelMixin):
fileSize: int = Field(description="Size of the file")
mimeType: str = Field(description="MIME type of the file")
contents: List[ChatContent] = Field(default_factory=list, description="List of chat contents")
-
# Register labels for ChatDocument
register_model_labels(
"ChatDocument",
@@ -64,7 +81,6 @@ class ChatStat(BaseModel, ModelMixin):
bytesReceived: Optional[int] = Field(None, description="Number of bytes received")
successRate: Optional[float] = Field(None, description="Success rate of operations")
errorCount: Optional[int] = Field(None, description="Number of errors encountered")
-
# Register labels for ChatStat
register_model_labels(
"ChatStat",
@@ -91,7 +107,6 @@ class ChatLog(BaseModel, ModelMixin):
status: str = Field(description="Status of the log entry")
progress: Optional[int] = Field(None, description="Progress percentage")
performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
-
# Register labels for ChatLog
register_model_labels(
"ChatLog",
@@ -124,7 +139,6 @@ class ChatMessage(BaseModel, ModelMixin):
finishedAt: Optional[str] = Field(None, description="When the message processing finished")
stats: Optional[ChatStat] = Field(None, description="Statistics for this message")
success: Optional[bool] = Field(None, description="Whether the message processing was successful")
-
# Register labels for ChatMessage
register_model_labels(
"ChatMessage",
@@ -146,7 +160,7 @@ register_model_labels(
}
)
-class Task(BaseModel, ModelMixin):
+class AgentTask(BaseModel, ModelMixin):
"""Data model for a task"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
workflowId: str = Field(description="Foreign key to workflow")
@@ -162,10 +176,9 @@ class Task(BaseModel, ModelMixin):
startedAt: str = Field(description="When the task started")
finishedAt: Optional[str] = Field(None, description="When the task finished")
performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
-
-# Register labels for Task
+# Register labels for AgentTask
register_model_labels(
- "Task",
+ "AgentTask",
{"en": "Task", "fr": "Tâche"},
{
"id": {"en": "ID", "fr": "ID"},
@@ -185,6 +198,28 @@ register_model_labels(
}
)
+class Agent(BaseModel, ModelMixin):
+ """Data model for an agent"""
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
+ name: str = Field(description="Name of the agent")
+ description: str = Field(description="Description of the agent")
+ capabilities: List[str] = Field(default_factory=list, description="List of agent capabilities")
+ performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
+# Register labels for Agent
+register_model_labels(
+ "Agent",
+ {"en": "Agent", "fr": "Agent"},
+ {
+ "id": {"en": "ID", "fr": "ID"},
+ "name": {"en": "Name", "fr": "Nom"},
+ "description": {"en": "Description", "fr": "Description"},
+ "capabilities": {"en": "Capabilities", "fr": "Capacités"},
+ "performance": {"en": "Performance", "fr": "Performance"}
+ }
+)
+
+# WORKFLOW MODELS
+
class ChatWorkflow(BaseModel, ModelMixin):
"""Data model for a chat workflow"""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
@@ -197,8 +232,7 @@ class ChatWorkflow(BaseModel, ModelMixin):
logs: List[ChatLog] = Field(default_factory=list, description="Workflow logs")
messages: List[ChatMessage] = Field(default_factory=list, description="Messages in the workflow")
stats: Optional[ChatStat] = Field(None, description="Workflow statistics")
- tasks: List[Task] = Field(default_factory=list, description="List of tasks in the workflow")
-
+ tasks: List[AgentTask] = Field(default_factory=list, description="List of tasks in the workflow")
# Register labels for ChatWorkflow
register_model_labels(
"ChatWorkflow",
@@ -218,151 +252,124 @@ register_model_labels(
}
)
-# AGENT AND TASK MODELS
+# DOCUMENT MODELS
-class Agent(BaseModel, ModelMixin):
- """Data model for an agent"""
- id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Primary key")
- name: str = Field(description="Name of the agent")
- description: str = Field(description="Description of the agent")
- capabilities: List[str] = Field(default_factory=list, description="List of agent capabilities")
- performance: Optional[Dict[str, Any]] = Field(None, description="Performance metrics")
+class DocumentExtraction(BaseModel, ModelMixin):
+ """Data model for document extraction history"""
+ timestamp: str = Field(description="Timestamp of extraction")
+ type: str = Field(description="Type of document")
+ sections: List[str] = Field(default_factory=list, description="Extracted sections")
+ metadata: Dict[str, Any] = Field(default_factory=dict, description="Extraction metadata")
-# Register labels for Agent
+# Register labels for DocumentExtraction
register_model_labels(
- "Agent",
- {"en": "Agent", "fr": "Agent"},
+ "DocumentExtraction",
+ {"en": "Document Extraction", "fr": "Extraction de document"},
+ {
+ "timestamp": {"en": "Timestamp", "fr": "Horodatage"},
+ "type": {"en": "Type", "fr": "Type"},
+ "sections": {"en": "Sections", "fr": "Sections"},
+ "metadata": {"en": "Metadata", "fr": "Métadonnées"}
+ }
+)
+
+class DocumentContext(BaseModel, ModelMixin):
+ """Data model for document context"""
+ id: str = Field(description="Document ID")
+ extractionHistory: List[DocumentExtraction] = Field(default_factory=list, description="History of extractions")
+ relevantSections: List[str] = Field(default_factory=list, description="Relevant sections")
+ processingStatus: Dict[str, str] = Field(default_factory=dict, description="Processing status")
+
+# Register labels for DocumentContext
+register_model_labels(
+ "DocumentContext",
+ {"en": "Document Context", "fr": "Contexte de document"},
+ {
+ "id": {"en": "ID", "fr": "ID"},
+ "extractionHistory": {"en": "Extraction History", "fr": "Historique d'extraction"},
+ "relevantSections": {"en": "Relevant Sections", "fr": "Sections pertinentes"},
+ "processingStatus": {"en": "Processing Status", "fr": "Statut de traitement"}
+ }
+)
+
+class DocumentMetadata(BaseModel, ModelMixin):
+ """Data model for document metadata"""
+ type: str = Field(description="Document type")
+ format: str = Field(description="Document format")
+ size: int = Field(description="Document size in bytes")
+ pages: Optional[int] = Field(None, description="Number of pages")
+ sections: Optional[List[str]] = Field(None, description="Document sections")
+ error: Optional[str] = Field(None, description="Processing error if any")
+
+# Register labels for DocumentMetadata
+register_model_labels(
+ "DocumentMetadata",
+ {"en": "Document Metadata", "fr": "Métadonnées de document"},
+ {
+ "type": {"en": "Type", "fr": "Type"},
+ "format": {"en": "Format", "fr": "Format"},
+ "size": {"en": "Size", "fr": "Taille"},
+ "pages": {"en": "Pages", "fr": "Pages"},
+ "sections": {"en": "Sections", "fr": "Sections"},
+ "error": {"en": "Error", "fr": "Erreur"}
+ }
+)
+
+class ImageData(BaseModel, ModelMixin):
+ """Data model for image data"""
+ data: str = Field(description="Base64 encoded image data")
+ format: str = Field(description="Image format")
+ page: Optional[int] = Field(None, description="Page number if from a multi-page document")
+ index: Optional[int] = Field(None, description="Image index in the document")
+
+# Register labels for ImageData
+register_model_labels(
+ "ImageData",
+ {"en": "Image Data", "fr": "Données d'image"},
+ {
+ "data": {"en": "Image Data", "fr": "Données d'image"},
+ "format": {"en": "Format", "fr": "Format"},
+ "page": {"en": "Page", "fr": "Page"},
+ "index": {"en": "Index", "fr": "Index"}
+ }
+)
+
+class DocumentContent(BaseModel, ModelMixin):
+ """Data model for document content"""
+ text: Optional[str] = Field(None, description="Extracted text content")
+ data: Optional[Dict[str, Any]] = Field(None, description="Structured data content")
+ images: Optional[List[ImageData]] = Field(None, description="Extracted images")
+ metadata: DocumentMetadata = Field(description="Document metadata")
+
+# Register labels for DocumentContent
+register_model_labels(
+ "DocumentContent",
+ {"en": "Document Content", "fr": "Contenu de document"},
+ {
+ "text": {"en": "Text", "fr": "Texte"},
+ "data": {"en": "Data", "fr": "Données"},
+ "images": {"en": "Images", "fr": "Images"},
+ "metadata": {"en": "Metadata", "fr": "Métadonnées"}
+ }
+)
+
+class ProcessedDocument(BaseModel, ModelMixin):
+ """Data model for processed document"""
+ id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Document ID")
+ name: str = Field(description="Document name")
+ contentType: str = Field(description="Content type")
+ content: DocumentContent = Field(description="Document content")
+ context: Optional[DocumentContext] = Field(None, description="Document context")
+
+# Register labels for ProcessedDocument
+register_model_labels(
+ "ProcessedDocument",
+ {"en": "Processed Document", "fr": "Document traité"},
{
"id": {"en": "ID", "fr": "ID"},
"name": {"en": "Name", "fr": "Nom"},
- "description": {"en": "Description", "fr": "Description"},
- "capabilities": {"en": "Capabilities", "fr": "Capacités"},
- "performance": {"en": "Performance", "fr": "Performance"}
+ "contentType": {"en": "Content Type", "fr": "Type de contenu"},
+ "content": {"en": "Content", "fr": "Contenu"},
+ "context": {"en": "Context", "fr": "Contexte"}
}
)
-
-class AgentResponse(BaseModel, ModelMixin):
- """Data model for an agent response"""
- success: bool = Field(description="Whether the agent execution was successful")
- message: ChatMessage = Field(description="Response message from the agent")
- performance: Dict[str, Any] = Field(default_factory=dict, description="Performance metrics")
- progress: float = Field(description="Task progress (0-100)")
-
-# Register labels for AgentResponse
-register_model_labels(
- "AgentResponse",
- {"en": "Agent Response", "fr": "Réponse de l'agent"},
- {
- "success": {"en": "Success", "fr": "Succès"},
- "message": {"en": "Message", "fr": "Message"},
- "performance": {"en": "Performance", "fr": "Performance"},
- "progress": {"en": "Progress", "fr": "Progression"}
- }
-)
-
-class TaskPlan(BaseModel, ModelMixin):
- """Data model for a task plan"""
- fileList: List[str] = Field(default_factory=list, description="List of files")
- tasks: List[Task] = Field(default_factory=list, description="List of tasks in the plan")
- userLanguage: str = Field(description="User's preferred language")
- userResponse: str = Field(description="User's response or feedback")
-
-# Register labels for TaskPlan
-register_model_labels(
- "TaskPlan",
- {"en": "Task Plan", "fr": "Plan de tâches"},
- {
- "fileList": {"en": "File List", "fr": "Liste de fichiers"},
- "tasks": {"en": "Tasks", "fr": "Tâches"},
- "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"},
- "userResponse": {"en": "User Response", "fr": "Réponse de l'utilisateur"}
- }
-)
-
-class UserInputRequest(BaseModel, ModelMixin):
- """Data model for a user input request"""
- prompt: str = Field(description="Prompt for the user")
- listFileId: List[int] = Field(default_factory=list, description="List of file IDs")
- userLanguage: str = Field(default="en", description="User's preferred language")
-
-# Register labels for UserInputRequest
-register_model_labels(
- "UserInputRequest",
- {"en": "User Input Request", "fr": "Demande de saisie utilisateur"},
- {
- "prompt": {"en": "Prompt", "fr": "Invite"},
- "listFileId": {"en": "File IDs", "fr": "IDs des fichiers"},
- "userLanguage": {"en": "User Language", "fr": "Langue de l'utilisateur"}
- }
-)
-
-class AgentProfile(BaseModel, ModelMixin):
- """Model for agent profile information."""
- id: str
- name: str
- description: str
- capabilities: List[str] = Field(default_factory=list)
- isAvailable: bool = True
- lastActive: Optional[datetime] = None
- stats: Optional[Dict[str, Any]] = None
-
-# Register labels for AgentProfile
-register_model_labels(
- "AgentProfile",
- {"en": "Agent Profile", "fr": "Profil de l'agent"},
- {
- "id": {"en": "ID", "fr": "ID"},
- "name": {"en": "Name", "fr": "Nom"},
- "description": {"en": "Description", "fr": "Description"},
- "capabilities": {"en": "Capabilities", "fr": "Capacités"},
- "isAvailable": {"en": "Available", "fr": "Disponible"},
- "lastActive": {"en": "Last Active", "fr": "Dernière activité"},
- "stats": {"en": "Statistics", "fr": "Statistiques"}
- }
-)
-
-class AgentHandover(BaseModel, ModelMixin):
- """Data model for agent handover information."""
- # Status values
- status: str = Field(default="pending", description="One of: pending, success, failed, retry")
- error: Optional[str] = Field(None, description="Error message if any")
- progress: float = Field(default=0.0, description="Progress percentage")
-
- # Document information
- documentsUserInitial: List[Dict[str, Any]] = Field(default_factory=list, description="Initial user documents")
- documentsInput: List[Dict[str, Any]] = Field(default_factory=list, description="Input documents")
- documentsOutput: List[Dict[str, Any]] = Field(default_factory=list, description="Output documents")
-
- # Prompt information
- promptUserInitial: str = Field(default="", description="Initial user prompt")
- promptFromFinishedAgent: str = Field(default="", description="Prompt from finished agent")
- promptForNextAgent: str = Field(default="", description="Prompt for next agent")
-
- # Agent information
- currentAgent: Optional[str] = Field(None, description="Current agent name")
- nextAgent: Optional[str] = Field(None, description="Next agent name")
-
- # Timing information
- startedAt: Optional[str] = Field(None, description="Start timestamp")
- finishedAt: Optional[str] = Field(None, description="Finish timestamp")
-
-# Register labels for AgentHandover
-register_model_labels(
- "AgentHandover",
- {"en": "Agent Handover", "fr": "Transfert d'agent"},
- {
- "status": {"en": "Status", "fr": "Statut"},
- "error": {"en": "Error", "fr": "Erreur"},
- "progress": {"en": "Progress", "fr": "Progression"},
- "documentsUserInitial": {"en": "Initial User Documents", "fr": "Documents utilisateur initiaux"},
- "documentsInput": {"en": "Input Documents", "fr": "Documents d'entrée"},
- "documentsOutput": {"en": "Output Documents", "fr": "Documents de sortie"},
- "promptUserInitial": {"en": "Initial User Prompt", "fr": "Invite utilisateur initiale"},
- "promptFromFinishedAgent": {"en": "Finished Agent Prompt", "fr": "Invite de l'agent terminé"},
- "promptForNextAgent": {"en": "Next Agent Prompt", "fr": "Invite pour le prochain agent"},
- "currentAgent": {"en": "Current Agent", "fr": "Agent actuel"},
- "nextAgent": {"en": "Next Agent", "fr": "Prochain agent"},
- "startedAt": {"en": "Started At", "fr": "Démarré le"},
- "finishedAt": {"en": "Finished At", "fr": "Terminé le"}
- }
-)
\ No newline at end of file
diff --git a/modules/methods/methodBase.py b/modules/methods/methodBase.py
new file mode 100644
index 00000000..7618b5a8
--- /dev/null
+++ b/modules/methods/methodBase.py
@@ -0,0 +1,74 @@
+from enum import Enum
+from typing import Dict, List, Optional, Any, Literal
+from datetime import datetime, UTC
+from pydantic import BaseModel, Field
+
+class AuthSource(str, Enum):
+ LOCAL = "local"
+ MSFT = "msft"
+ GOOGLE = "google"
+ # Add more auth sources as needed
+
+class MethodParameter(BaseModel):
+ """Model for method parameters"""
+ name: str
+ type: str
+ required: bool
+ validation: Optional[callable] = None
+ description: str
+
+class MethodResult(BaseModel):
+ """Model for method results"""
+ success: bool
+ data: Dict[str, Any]
+ metadata: Dict[str, Any] = Field(default_factory=dict)
+ validation: List[str] = Field(default_factory=list)
+
+class MethodBase:
+ """Base class for all methods"""
+
+ def __init__(self):
+ self.name: str
+ self.description: str
+ self.auth_source: AuthSource = AuthSource.LOCAL # Default to local auth
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ raise NotImplementedError
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute method action with authentication data"""
+ raise NotImplementedError
+
+ async def validate_parameters(self, action: str, parameters: Dict[str, Any]) -> bool:
+ """Validate action parameters"""
+ if action not in self.actions:
+ return False
+
+ action_def = self.actions[action]
+ required_params = {k for k, v in action_def['parameters'].items() if v['required']}
+ return all(param in parameters for param in required_params)
+
+ async def rollback(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> None:
+ """Rollback action if needed"""
+ pass
+
+ def _validate_auth(self, auth_data: Optional[Dict[str, Any]] = None) -> bool:
+ """Validate authentication data"""
+ if self.auth_source == AuthSource.LOCAL:
+ return True
+ return bool(auth_data and auth_data.get('source') == self.auth_source)
+
+ def _create_result(self, success: bool, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Create a method result"""
+ return MethodResult(
+ success=success,
+ data=data,
+ metadata=metadata or {},
+ validation=[]
+ )
+
+ def _add_validation_message(self, result: MethodResult, message: str) -> None:
+ """Add a validation message to the result"""
+ result.validation.append(message)
\ No newline at end of file
diff --git a/modules/methods/methodCoder.py b/modules/methods/methodCoder.py
new file mode 100644
index 00000000..d39c1b08
--- /dev/null
+++ b/modules/methods/methodCoder.py
@@ -0,0 +1,272 @@
+from typing import Dict, Any, Optional
+import logging
+import ast
+import re
+
+from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+
+logger = logging.getLogger(__name__)
+
+class MethodCoder(MethodBase):
+ """Coder method implementation for code operations"""
+
+ def __init__(self):
+ super().__init__()
+ self.name = "coder"
+ self.description = "Handle code operations like analysis, generation, and refactoring"
+ self.auth_source = AuthSource.LOCAL # Code operations typically don't need auth
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "analyze": {
+ "description": "Analyze code structure and quality",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "code": {"type": "string", "required": True},
+ "language": {"type": "string", "required": False},
+ "metrics": {"type": "array", "items": "string", "required": False}
+ }
+ },
+ "generate": {
+ "description": "Generate code based on requirements",
+ "retryMax": 2,
+ "timeout": 60,
+ "parameters": {
+ "requirements": {"type": "string", "required": True},
+ "language": {"type": "string", "required": False},
+ "style": {"type": "string", "required": False}
+ }
+ },
+ "refactor": {
+ "description": "Refactor code for better quality",
+ "retryMax": 2,
+ "timeout": 60,
+ "parameters": {
+ "code": {"type": "string", "required": True},
+ "language": {"type": "string", "required": False},
+ "improvements": {"type": "array", "items": "string", "required": False}
+ }
+ }
+ }
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute coder method"""
+ try:
+ # Validate parameters
+ if not await self.validate_parameters(action, parameters):
+ return self._create_result(
+ success=False,
+ data={"error": f"Invalid parameters for {action}"}
+ )
+
+ # Execute action
+ if action == "analyze":
+ return await self._analyze_code(parameters)
+ elif action == "generate":
+ return await self._generate_code(parameters)
+ elif action == "refactor":
+ return await self._refactor_code(parameters)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unknown action: {action}"}
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing coder {action}: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": str(e)}
+ )
+
+ async def _analyze_code(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Analyze code structure and quality"""
+ try:
+ code = parameters["code"]
+ language = parameters.get("language", "python")
+ metrics = parameters.get("metrics", ["complexity", "style", "documentation"])
+
+ analysis = {}
+
+ if language.lower() == "python":
+ # Parse Python code
+ try:
+ tree = ast.parse(code)
+
+ # Calculate basic metrics
+ analysis["metrics"] = {
+ "lines": len(code.splitlines()),
+ "classes": len([node for node in ast.walk(tree) if isinstance(node, ast.ClassDef)]),
+ "functions": len([node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef)]),
+ "imports": len([node for node in ast.walk(tree) if isinstance(node, ast.Import) or isinstance(node, ast.ImportFrom)])
+ }
+
+ # Check for common issues
+ analysis["issues"] = []
+
+ # Check for missing docstrings
+ if "documentation" in metrics:
+ for node in ast.walk(tree):
+ if isinstance(node, (ast.ClassDef, ast.FunctionDef)) and not ast.get_docstring(node):
+ analysis["issues"].append({
+ "type": "missing_docstring",
+ "line": node.lineno,
+ "name": node.name
+ })
+
+ # Check for long functions
+ if "complexity" in metrics:
+ for node in ast.walk(tree):
+ if isinstance(node, ast.FunctionDef):
+ body_lines = len(node.body)
+ if body_lines > 20: # Arbitrary threshold
+ analysis["issues"].append({
+ "type": "long_function",
+ "line": node.lineno,
+ "name": node.name,
+ "lines": body_lines
+ })
+
+ # Check for style issues
+ if "style" in metrics:
+ # Check line length
+ for i, line in enumerate(code.splitlines(), 1):
+ if len(line) > 100: # PEP 8 recommendation
+ analysis["issues"].append({
+ "type": "line_too_long",
+ "line": i,
+ "length": len(line)
+ })
+
+ # Check for mixed tabs and spaces
+ if "\t" in code and " " in code:
+ analysis["issues"].append({
+ "type": "mixed_tabs_spaces",
+ "message": "Code mixes tabs and spaces"
+ })
+
+ except SyntaxError as e:
+ return self._create_result(
+ success=False,
+ data={"error": f"Syntax error: {str(e)}"}
+ )
+ else:
+ # TODO: Implement analysis for other languages
+ return self._create_result(
+ success=False,
+ data={"error": f"Unsupported language: {language}"}
+ )
+
+ return self._create_result(
+ success=True,
+ data={
+ "language": language,
+ "analysis": analysis
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error analyzing code: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Analysis failed: {str(e)}"}
+ )
+
+ async def _generate_code(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Generate code based on requirements"""
+ try:
+ requirements = parameters["requirements"]
+ language = parameters.get("language", "python")
+ style = parameters.get("style", "standard")
+
+ # TODO: Implement code generation using AI or templates
+ # This is a placeholder implementation
+ if language.lower() == "python":
+ # Generate a simple Python class based on requirements
+ class_name = re.sub(r'[^a-zA-Z0-9]', '', requirements.split()[0].title())
+ code = f"""class {class_name}:
+ \"\"\"
+ {requirements}
+ \"\"\"
+
+ def __init__(self):
+ pass
+
+ def process(self):
+ pass
+"""
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unsupported language: {language}"}
+ )
+
+ return self._create_result(
+ success=True,
+ data={
+ "language": language,
+ "code": code
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error generating code: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Generation failed: {str(e)}"}
+ )
+
+ async def _refactor_code(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Refactor code for better quality"""
+ try:
+ code = parameters["code"]
+ language = parameters.get("language", "python")
+ improvements = parameters.get("improvements", ["style", "complexity"])
+
+ if language.lower() == "python":
+ # Parse Python code
+ try:
+ tree = ast.parse(code)
+
+ # Apply improvements
+ if "style" in improvements:
+ # Format code (placeholder)
+ code = code.strip()
+
+ if "complexity" in improvements:
+ # TODO: Implement complexity reduction
+ pass
+
+ if "documentation" in improvements:
+ # Add missing docstrings
+ for node in ast.walk(tree):
+ if isinstance(node, (ast.ClassDef, ast.FunctionDef)) and not ast.get_docstring(node):
+ # TODO: Generate docstring
+ pass
+
+ except SyntaxError as e:
+ return self._create_result(
+ success=False,
+ data={"error": f"Syntax error: {str(e)}"}
+ )
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unsupported language: {language}"}
+ )
+
+ return self._create_result(
+ success=True,
+ data={
+ "language": language,
+ "code": code,
+ "improvements": improvements
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error refactoring code: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Refactoring failed: {str(e)}"}
+ )
\ No newline at end of file
diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py
new file mode 100644
index 00000000..26aab156
--- /dev/null
+++ b/modules/methods/methodDocument.py
@@ -0,0 +1,287 @@
+from typing import Dict, Any, Optional
+import logging
+import os
+from pathlib import Path
+import docx
+import PyPDF2
+import json
+import yaml
+import xml.etree.ElementTree as ET
+from datetime import datetime, UTC
+
+from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+
+logger = logging.getLogger(__name__)
+
+class MethodDocument(MethodBase):
+ """Document method implementation for document operations"""
+
+ def __init__(self):
+ super().__init__()
+ self.name = "document"
+ self.description = "Handle document operations like reading, writing, and converting documents"
+ self.auth_source = AuthSource.LOCAL # Document operations typically don't need auth
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "read": {
+ "description": "Read document content",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "path": {"type": "string", "required": True},
+ "format": {"type": "string", "required": False},
+ "encoding": {"type": "string", "required": False},
+ "includeMetadata": {"type": "boolean", "required": False}
+ }
+ },
+ "write": {
+ "description": "Write content to document",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "path": {"type": "string", "required": True},
+ "content": {"type": "string", "required": True},
+ "format": {"type": "string", "required": False},
+ "encoding": {"type": "string", "required": False},
+ "template": {"type": "string", "required": False}
+ }
+ },
+ "convert": {
+ "description": "Convert document between formats",
+ "retryMax": 2,
+ "timeout": 60,
+ "parameters": {
+ "sourcePath": {"type": "string", "required": True},
+ "targetPath": {"type": "string", "required": True},
+ "sourceFormat": {"type": "string", "required": False},
+ "targetFormat": {"type": "string", "required": False},
+ "options": {"type": "object", "required": False}
+ }
+ }
+ }
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute document method"""
+ try:
+ # Validate parameters
+ if not await self.validate_parameters(action, parameters):
+ return self._create_result(
+ success=False,
+ data={"error": f"Invalid parameters for {action}"}
+ )
+
+ # Execute action
+ if action == "read":
+ return await self._read_document(parameters)
+ elif action == "write":
+ return await self._write_document(parameters)
+ elif action == "convert":
+ return await self._convert_document(parameters)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unknown action: {action}"}
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing document {action}: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": str(e)}
+ )
+
+ async def _read_document(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Read document content"""
+ try:
+ path = Path(parameters["path"])
+ if not path.exists():
+ return self._create_result(
+ success=False,
+ data={"error": f"File not found: {path}"}
+ )
+
+ # Determine format if not specified
+ format = parameters.get("format")
+ if not format:
+ format = path.suffix[1:] if path.suffix else "txt"
+
+ # Read content based on format
+ content = ""
+ encoding = parameters.get("encoding", "utf-8")
+ include_metadata = parameters.get("includeMetadata", False)
+
+ if format.lower() in ["txt", "md"]:
+ with open(path, "r", encoding=encoding) as f:
+ content = f.read()
+ elif format.lower() == "docx":
+ doc = docx.Document(path)
+ content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
+ elif format.lower() == "pdf":
+ with open(path, "rb") as f:
+ pdf = PyPDF2.PdfReader(f)
+ content = "\n".join([page.extract_text() for page in pdf.pages])
+ elif format.lower() == "json":
+ with open(path, "r", encoding=encoding) as f:
+ content = json.load(f)
+ elif format.lower() == "yaml":
+ with open(path, "r", encoding=encoding) as f:
+ content = yaml.safe_load(f)
+ elif format.lower() == "xml":
+ tree = ET.parse(path)
+ root = tree.getroot()
+ content = ET.tostring(root, encoding=encoding).decode(encoding)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unsupported format: {format}"}
+ )
+
+ result = {
+ "path": str(path),
+ "format": format,
+ "content": content
+ }
+
+ if include_metadata:
+ result["metadata"] = {
+ "size": path.stat().st_size,
+ "modified": datetime.fromtimestamp(path.stat().st_mtime, UTC).isoformat(),
+ "created": datetime.fromtimestamp(path.stat().st_ctime, UTC).isoformat()
+ }
+
+ return self._create_result(
+ success=True,
+ data=result
+ )
+ except Exception as e:
+ logger.error(f"Error reading document: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Read failed: {str(e)}"}
+ )
+
+ async def _write_document(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Write content to document"""
+ try:
+ path = Path(parameters["path"])
+
+ # Create directory if it doesn't exist
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ # Determine format if not specified
+ format = parameters.get("format")
+ if not format:
+ format = path.suffix[1:] if path.suffix else "txt"
+
+ # Write content based on format
+ encoding = parameters.get("encoding", "utf-8")
+ content = parameters["content"]
+ template = parameters.get("template")
+
+ if format.lower() in ["txt", "md"]:
+ with open(path, "w", encoding=encoding) as f:
+ f.write(content)
+ elif format.lower() == "docx":
+ if template:
+ doc = docx.Document(template)
+ else:
+ doc = docx.Document()
+ doc.add_paragraph(content)
+ doc.save(path)
+ elif format.lower() == "pdf":
+ # TODO: Implement PDF writing
+ return self._create_result(
+ success=False,
+ data={"error": "PDF writing not implemented yet"}
+ )
+ elif format.lower() == "json":
+ with open(path, "w", encoding=encoding) as f:
+ json.dump(content, f, indent=2)
+ elif format.lower() == "yaml":
+ with open(path, "w", encoding=encoding) as f:
+ yaml.dump(content, f)
+ elif format.lower() == "xml":
+ with open(path, "w", encoding=encoding) as f:
+ f.write(content)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unsupported format: {format}"}
+ )
+
+ return self._create_result(
+ success=True,
+ data={
+ "path": str(path),
+ "format": format,
+ "size": path.stat().st_size,
+ "modified": datetime.now(UTC).isoformat()
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error writing document: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Write failed: {str(e)}"}
+ )
+
+ async def _convert_document(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Convert document between formats"""
+ try:
+ source_path = Path(parameters["sourcePath"])
+ target_path = Path(parameters["targetPath"])
+
+ if not source_path.exists():
+ return self._create_result(
+ success=False,
+ data={"error": f"Source file not found: {source_path}"}
+ )
+
+ # Determine formats if not specified
+ source_format = parameters.get("sourceFormat")
+ if not source_format:
+ source_format = source_path.suffix[1:] if source_path.suffix else "txt"
+
+ target_format = parameters.get("targetFormat")
+ if not target_format:
+ target_format = target_path.suffix[1:] if target_path.suffix else "txt"
+
+ # Read source content
+ source_content = await self._read_document({
+ "path": str(source_path),
+ "format": source_format
+ })
+
+ if not source_content.success:
+ return source_content
+
+ # Write target content
+ target_content = await self._write_document({
+ "path": str(target_path),
+ "content": source_content.data["content"],
+ "format": target_format
+ })
+
+ if not target_content.success:
+ return target_content
+
+ return self._create_result(
+ success=True,
+ data={
+ "sourcePath": str(source_path),
+ "targetPath": str(target_path),
+ "sourceFormat": source_format,
+ "targetFormat": target_format,
+ "size": target_path.stat().st_size,
+ "modified": datetime.now(UTC).isoformat()
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error converting document: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Conversion failed: {str(e)}"}
+ )
\ No newline at end of file
diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py
new file mode 100644
index 00000000..908cbb3f
--- /dev/null
+++ b/modules/methods/methodOutlook.py
@@ -0,0 +1,203 @@
+from typing import Dict, Any, Optional
+import logging
+from datetime import datetime, UTC
+from O365 import Account, MSGraphProtocol
+
+from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.models.userConnection import UserConnection
+
+logger = logging.getLogger(__name__)
+
+class MethodOutlook(MethodBase):
+ """Outlook method implementation for email operations"""
+
+ def __init__(self):
+ super().__init__()
+ self.name = "outlook"
+ self.description = "Handle Outlook email operations like reading and sending emails"
+ self.auth_source = AuthSource.MICROSOFT
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "readMails": {
+ "description": "Read emails from Outlook",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "folder": {"type": "string", "required": False},
+ "query": {"type": "string", "required": False},
+ "maxResults": {"type": "number", "required": False},
+ "includeAttachments": {"type": "boolean", "required": False}
+ }
+ },
+ "sendMail": {
+ "description": "Send email through Outlook",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "to": {"type": "array", "items": "string", "required": True},
+ "subject": {"type": "string", "required": True},
+ "body": {"type": "string", "required": True},
+ "cc": {"type": "array", "items": "string", "required": False},
+ "bcc": {"type": "array", "items": "string", "required": False},
+ "attachments": {"type": "array", "items": "string", "required": False}
+ }
+ }
+ }
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute Outlook method"""
+ try:
+ # Validate parameters
+ if not await self.validate_parameters(action, parameters):
+ return self._create_result(
+ success=False,
+ data={"error": f"Invalid parameters for {action}"}
+ )
+
+ # Get UserConnection from auth_data
+ if not auth_data or "userConnection" not in auth_data:
+ return self._create_result(
+ success=False,
+ data={"error": "UserConnection required for Outlook operations"}
+ )
+
+ user_connection: UserConnection = auth_data["userConnection"]
+
+ # Execute action
+ if action == "readMails":
+ return await self._read_mails(parameters, user_connection)
+ elif action == "sendMail":
+ return await self._send_mail(parameters, user_connection)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unknown action: {action}"}
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing Outlook {action}: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": str(e)}
+ )
+
+ async def _read_mails(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult:
+ """Read emails from Outlook"""
+ try:
+ folder = parameters.get("folder", "inbox")
+ query = parameters.get("query")
+ max_results = parameters.get("maxResults", 10)
+ include_attachments = parameters.get("includeAttachments", False)
+
+ # Create Outlook account
+ account = Account(
+ credentials=(user_connection.authToken, user_connection.refreshToken),
+ protocol=MSGraphProtocol()
+ )
+
+ # Get mailbox
+ mailbox = account.mailbox()
+
+ # Get folder
+ target_folder = mailbox.folder(folder_name=folder)
+
+ # Get messages
+ if query:
+ messages = target_folder.get_messages(query=query, limit=max_results)
+ else:
+ messages = target_folder.get_messages(limit=max_results)
+
+ # Process messages
+ results = []
+ for message in messages:
+ msg_data = {
+ "id": message.object_id,
+ "subject": message.subject,
+ "from": message.sender.address,
+ "to": [to.address for to in message.to],
+ "cc": [cc.address for cc in message.cc],
+ "received": message.received.strftime("%Y-%m-%d %H:%M:%S"),
+ "body": message.body,
+ "hasAttachments": message.has_attachments
+ }
+
+ if include_attachments and message.has_attachments:
+ attachments = []
+ for attachment in message.attachments:
+ attachments.append({
+ "name": attachment.name,
+ "contentType": attachment.content_type,
+ "size": attachment.size
+ })
+ msg_data["attachments"] = attachments
+
+ results.append(msg_data)
+
+ return self._create_result(
+ success=True,
+ data={
+ "folder": folder,
+ "query": query,
+ "messages": results
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error reading Outlook emails: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Read failed: {str(e)}"}
+ )
+
+ async def _send_mail(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult:
+ """Send email through Outlook"""
+ try:
+ to_addresses = parameters["to"]
+ subject = parameters["subject"]
+ body = parameters["body"]
+ cc_addresses = parameters.get("cc", [])
+ bcc_addresses = parameters.get("bcc", [])
+ attachments = parameters.get("attachments", [])
+
+ # Create Outlook account
+ account = Account(
+ credentials=(user_connection.authToken, user_connection.refreshToken),
+ protocol=MSGraphProtocol()
+ )
+
+ # Get mailbox
+ mailbox = account.mailbox()
+
+ # Create new message
+ message = mailbox.new_message()
+ message.to.add(to_addresses)
+ if cc_addresses:
+ message.cc.add(cc_addresses)
+ if bcc_addresses:
+ message.bcc.add(bcc_addresses)
+ message.subject = subject
+ message.body = body
+
+ # Add attachments
+ for attachment_path in attachments:
+ message.attachments.add(attachment_path)
+
+ # Send message
+ message.send()
+
+ return self._create_result(
+ success=True,
+ data={
+ "to": to_addresses,
+ "subject": subject,
+ "sent": datetime.now(UTC).isoformat()
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error sending Outlook email: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Send failed: {str(e)}"}
+ )
\ No newline at end of file
diff --git a/modules/methods/methodPowerpoint.py b/modules/methods/methodPowerpoint.py
new file mode 100644
index 00000000..bed7abc9
--- /dev/null
+++ b/modules/methods/methodPowerpoint.py
@@ -0,0 +1,199 @@
+from typing import Dict, Any, Optional
+import logging
+import os
+from pathlib import Path
+
+from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+
+logger = logging.getLogger(__name__)
+
+class MethodPowerpoint(MethodBase):
+ """Powerpoint method implementation for PowerPoint operations"""
+
+ def __init__(self):
+ super().__init__()
+ self.name = "powerpoint"
+ self.description = "Handle PowerPoint operations like reading, writing, and converting presentations"
+ self.auth_source = AuthSource.MICROSOFT # PowerPoint operations need Microsoft auth
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "read": {
+ "description": "Read PowerPoint presentation content",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "path": {"type": "string", "required": True},
+ "format": {"type": "string", "required": False},
+ "includeNotes": {"type": "boolean", "required": False}
+ }
+ },
+ "write": {
+ "description": "Write content to PowerPoint presentation",
+ "retryMax": 2,
+ "timeout": 60,
+ "parameters": {
+ "path": {"type": "string", "required": True},
+ "content": {"type": "object", "required": True},
+ "template": {"type": "string", "required": False}
+ }
+ },
+ "convert": {
+ "description": "Convert PowerPoint presentation between formats",
+ "retryMax": 2,
+ "timeout": 60,
+ "parameters": {
+ "sourcePath": {"type": "string", "required": True},
+ "targetPath": {"type": "string", "required": True},
+ "sourceFormat": {"type": "string", "required": False},
+ "targetFormat": {"type": "string", "required": False}
+ }
+ }
+ }
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute powerpoint method"""
+ try:
+ # Validate parameters
+ if not await self.validate_parameters(action, parameters):
+ return self._create_result(
+ success=False,
+ data={"error": f"Invalid parameters for {action}"}
+ )
+
+ # Validate authentication
+ if not await self.validate_auth(auth_data):
+ return self._create_result(
+ success=False,
+ data={"error": "Authentication required for PowerPoint operations"}
+ )
+
+ # Execute action
+ if action == "read":
+ return await self._read_presentation(parameters, auth_data)
+ elif action == "write":
+ return await self._write_presentation(parameters, auth_data)
+ elif action == "convert":
+ return await self._convert_presentation(parameters, auth_data)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unknown action: {action}"}
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing powerpoint {action}: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": str(e)}
+ )
+
+ async def _read_presentation(self, parameters: Dict[str, Any], auth_data: Dict[str, Any]) -> MethodResult:
+ """Read PowerPoint presentation content"""
+ try:
+ path = Path(parameters["path"])
+ if not path.exists():
+ return self._create_result(
+ success=False,
+ data={"error": f"File not found: {path}"}
+ )
+
+ # Determine format if not specified
+ format = parameters.get("format")
+ if not format:
+ format = path.suffix[1:] if path.suffix else "pptx"
+
+ # TODO: Implement PowerPoint reading using Microsoft Graph API
+ # This is a placeholder implementation
+ return self._create_result(
+ success=True,
+ data={
+ "path": str(path),
+ "format": format,
+ "slides": [
+ {
+ "number": 1,
+ "title": "Example Slide",
+ "content": "Example content",
+ "notes": "Example notes" if parameters.get("includeNotes", False) else None
+ }
+ ]
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error reading presentation: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Read failed: {str(e)}"}
+ )
+
+ async def _write_presentation(self, parameters: Dict[str, Any], auth_data: Dict[str, Any]) -> MethodResult:
+ """Write content to PowerPoint presentation"""
+ try:
+ path = Path(parameters["path"])
+
+ # Create directory if it doesn't exist
+ path.parent.mkdir(parents=True, exist_ok=True)
+
+ # Determine format if not specified
+ format = parameters.get("format")
+ if not format:
+ format = path.suffix[1:] if path.suffix else "pptx"
+
+ # TODO: Implement PowerPoint writing using Microsoft Graph API
+ # This is a placeholder implementation
+ return self._create_result(
+ success=True,
+ data={
+ "path": str(path),
+ "format": format,
+ "slides": len(parameters["content"].get("slides", []))
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error writing presentation: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Write failed: {str(e)}"}
+ )
+
+ async def _convert_presentation(self, parameters: Dict[str, Any], auth_data: Dict[str, Any]) -> MethodResult:
+ """Convert PowerPoint presentation between formats"""
+ try:
+ source_path = Path(parameters["sourcePath"])
+ target_path = Path(parameters["targetPath"])
+
+ if not source_path.exists():
+ return self._create_result(
+ success=False,
+ data={"error": f"Source file not found: {source_path}"}
+ )
+
+ # Determine formats if not specified
+ source_format = parameters.get("sourceFormat")
+ if not source_format:
+ source_format = source_path.suffix[1:] if source_path.suffix else "pptx"
+
+ target_format = parameters.get("targetFormat")
+ if not target_format:
+ target_format = target_path.suffix[1:] if target_path.suffix else "pptx"
+
+ # TODO: Implement PowerPoint conversion using Microsoft Graph API
+ # This is a placeholder implementation
+ return self._create_result(
+ success=True,
+ data={
+ "sourcePath": str(source_path),
+ "targetPath": str(target_path),
+ "sourceFormat": source_format,
+ "targetFormat": target_format
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error converting presentation: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Conversion failed: {str(e)}"}
+ )
\ No newline at end of file
diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py
new file mode 100644
index 00000000..893dccfa
--- /dev/null
+++ b/modules/methods/methodSharepoint.py
@@ -0,0 +1,217 @@
+from typing import Dict, Any, Optional
+import logging
+from datetime import datetime, UTC
+from office365.runtime.auth.user_credential import UserCredential
+from office365.sharepoint.client_context import ClientContext
+from office365.sharepoint.files.file import File
+from office365.sharepoint.lists.list import List
+from office365.sharepoint.lists.list_creation_information import ListCreationInformation
+
+from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.models.userConnection import UserConnection
+
+logger = logging.getLogger(__name__)
+
+class MethodSharepoint(MethodBase):
+ """SharePoint method implementation for document operations"""
+
+ def __init__(self):
+ super().__init__()
+ self.name = "sharepoint"
+ self.description = "Handle SharePoint document operations like search, read, and write"
+ self.auth_source = AuthSource.MICROSOFT
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "search": {
+ "description": "Search SharePoint documents",
+ "retryMax": 3,
+ "timeout": 30,
+ "parameters": {
+ "query": {"type": "string", "required": True},
+ "siteUrl": {"type": "string", "required": True},
+ "listName": {"type": "string", "required": False},
+ "maxResults": {"type": "number", "required": False}
+ }
+ },
+ "read": {
+ "description": "Read SharePoint document content",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "fileUrl": {"type": "string", "required": True},
+ "siteUrl": {"type": "string", "required": True}
+ }
+ },
+ "write": {
+ "description": "Write content to SharePoint document",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "fileUrl": {"type": "string", "required": True},
+ "siteUrl": {"type": "string", "required": True},
+ "content": {"type": "string", "required": True},
+ "contentType": {"type": "string", "required": False}
+ }
+ }
+ }
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute SharePoint method"""
+ try:
+ # Validate parameters
+ if not await self.validate_parameters(action, parameters):
+ return self._create_result(
+ success=False,
+ data={"error": f"Invalid parameters for {action}"}
+ )
+
+ # Get UserConnection from auth_data
+ if not auth_data or "userConnection" not in auth_data:
+ return self._create_result(
+ success=False,
+ data={"error": "UserConnection required for SharePoint operations"}
+ )
+
+ user_connection: UserConnection = auth_data["userConnection"]
+
+ # Execute action
+ if action == "search":
+ return await self._search_documents(parameters, user_connection)
+ elif action == "read":
+ return await self._read_document(parameters, user_connection)
+ elif action == "write":
+ return await self._write_document(parameters, user_connection)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unknown action: {action}"}
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing SharePoint {action}: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": str(e)}
+ )
+
+ async def _search_documents(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult:
+ """Search SharePoint documents"""
+ try:
+ site_url = parameters["siteUrl"]
+ query = parameters["query"]
+ list_name = parameters.get("listName")
+ max_results = parameters.get("maxResults", 10)
+
+ # Create SharePoint context
+ ctx = ClientContext(site_url).with_credentials(
+ UserCredential(user_connection.authToken, user_connection.refreshToken)
+ )
+
+ # Search in specific list or entire site
+ if list_name:
+ target_list = ctx.web.lists.get_by_title(list_name)
+ items = target_list.items.filter(f"Title eq '{query}'").top(max_results).get().execute_query()
+ results = [{
+ "title": item.properties["Title"],
+ "url": item.properties["FileRef"],
+ "modified": item.properties["Modified"],
+ "created": item.properties["Created"]
+ } for item in items]
+ else:
+ # Search entire site
+ search_results = ctx.search(query).execute_query()
+ results = [{
+ "title": result.properties["Title"],
+ "url": result.properties["Path"],
+ "modified": result.properties["LastModifiedTime"],
+ "created": result.properties["Created"]
+ } for result in search_results[:max_results]]
+
+ return self._create_result(
+ success=True,
+ data={
+ "query": query,
+ "results": results
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error searching SharePoint documents: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Search failed: {str(e)}"}
+ )
+
+ async def _read_document(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult:
+ """Read SharePoint document content"""
+ try:
+ site_url = parameters["siteUrl"]
+ file_url = parameters["fileUrl"]
+
+ # Create SharePoint context
+ ctx = ClientContext(site_url).with_credentials(
+ UserCredential(user_connection.authToken, user_connection.refreshToken)
+ )
+
+ # Get file
+ file = ctx.web.get_file_by_server_relative_url(file_url)
+ file_content = file.read().execute_query()
+
+ return self._create_result(
+ success=True,
+ data={
+ "url": file_url,
+ "content": file_content.content.decode('utf-8'),
+ "modified": file.properties["TimeLastModified"],
+ "size": file.properties["Length"]
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error reading SharePoint document: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Read failed: {str(e)}"}
+ )
+
+ async def _write_document(self, parameters: Dict[str, Any], user_connection: UserConnection) -> MethodResult:
+ """Write content to SharePoint document"""
+ try:
+ site_url = parameters["siteUrl"]
+ file_url = parameters["fileUrl"]
+ content = parameters["content"]
+ content_type = parameters.get("contentType", "text/plain")
+
+ # Create SharePoint context
+ ctx = ClientContext(site_url).with_credentials(
+ UserCredential(user_connection.authToken, user_connection.refreshToken)
+ )
+
+ # Get or create file
+ try:
+ file = ctx.web.get_file_by_server_relative_url(file_url)
+ except:
+ # Create new file
+ folder_url = "/".join(file_url.split("/")[:-1])
+ file_name = file_url.split("/")[-1]
+ folder = ctx.web.get_folder_by_server_relative_url(folder_url)
+ file = folder.upload_file(file_name, content.encode('utf-8')).execute_query()
+
+ # Update file content
+ file.write(content.encode('utf-8')).execute_query()
+
+ return self._create_result(
+ success=True,
+ data={
+ "url": file_url,
+ "modified": datetime.now(UTC).isoformat(),
+ "size": len(content.encode('utf-8'))
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error writing SharePoint document: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Write failed: {str(e)}"}
+ )
\ No newline at end of file
diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py
new file mode 100644
index 00000000..cf968ed4
--- /dev/null
+++ b/modules/methods/methodWeb.py
@@ -0,0 +1,398 @@
+from typing import Dict, Any, Optional
+import logging
+import aiohttp
+import asyncio
+from bs4 import BeautifulSoup
+from urllib.parse import urljoin, urlparse
+import re
+from datetime import datetime, UTC
+import requests
+import time
+
+from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
+from modules.shared.configuration import APP_CONFIG
+
+logger = logging.getLogger(__name__)
+
+class MethodWeb(MethodBase):
+ """Web method implementation for web operations"""
+
+ def __init__(self):
+ super().__init__()
+ self.name = "web"
+ self.description = "Handle web operations like search, crawl, and content extraction"
+ self.auth_source = AuthSource.LOCAL # Web operations typically don't need auth
+
+ # Web crawling configuration from agentWebcrawler
+ self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
+ self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
+ self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
+ self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
+ self.timeout = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_TIMEOUT", "30"))
+ self.userAgent = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
+
+ if not self.srcApikey:
+ logger.error("SerpAPI key not configured")
+
+ @property
+ def actions(self) -> Dict[str, Dict[str, Any]]:
+ """Available actions and their parameters"""
+ return {
+ "search": {
+ "description": "Search web content",
+ "retryMax": 3,
+ "timeout": 30,
+ "parameters": {
+ "query": {"type": "string", "required": True},
+ "maxResults": {"type": "number", "required": False},
+ "filters": {"type": "object", "required": False},
+ "searchEngine": {"type": "string", "required": False}
+ }
+ },
+ "crawl": {
+ "description": "Crawl web pages",
+ "retryMax": 2,
+ "timeout": 60,
+ "parameters": {
+ "url": {"type": "string", "required": True},
+ "depth": {"type": "number", "required": False},
+ "followLinks": {"type": "boolean", "required": False},
+ "includeImages": {"type": "boolean", "required": False},
+ "respectRobots": {"type": "boolean", "required": False}
+ }
+ },
+ "extract": {
+ "description": "Extract content from web page",
+ "retryMax": 2,
+ "timeout": 30,
+ "parameters": {
+ "url": {"type": "string", "required": True},
+ "selectors": {"type": "array", "items": "string", "required": False},
+ "format": {"type": "string", "required": False},
+ "includeMetadata": {"type": "boolean", "required": False}
+ }
+ }
+ }
+
+ async def execute(self, action: str, parameters: Dict[str, Any], auth_data: Optional[Dict[str, Any]] = None) -> MethodResult:
+ """Execute web method"""
+ try:
+ # Validate parameters
+ if not await self.validate_parameters(action, parameters):
+ return self._create_result(
+ success=False,
+ data={"error": f"Invalid parameters for {action}"}
+ )
+
+ # Execute action
+ if action == "search":
+ return await self._search_web(parameters)
+ elif action == "crawl":
+ return await self._crawl_page(parameters)
+ elif action == "extract":
+ return await self._extract_content(parameters)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unknown action: {action}"}
+ )
+
+ except Exception as e:
+ logger.error(f"Error executing web {action}: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": str(e)}
+ )
+
+ async def _search_web(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Search web content"""
+ try:
+ query = parameters["query"]
+ max_results = parameters.get("maxResults", 10)
+ filters = parameters.get("filters", {})
+ search_engine = parameters.get("searchEngine", "google")
+
+ # Implement search using different engines
+ if search_engine.lower() == "google":
+ # Use Google Custom Search API
+ # TODO: Implement Google Custom Search API integration
+ results = await self._google_search(query, max_results, filters)
+ elif search_engine.lower() == "bing":
+ # Use Bing Web Search API
+ # TODO: Implement Bing Web Search API integration
+ results = await self._bing_search(query, max_results, filters)
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Unsupported search engine: {search_engine}"}
+ )
+
+ return self._create_result(
+ success=True,
+ data={
+ "query": query,
+ "engine": search_engine,
+ "results": results
+ }
+ )
+ except Exception as e:
+ logger.error(f"Error searching web: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Search failed: {str(e)}"}
+ )
+
+ async def _google_search(self, query: str, max_results: int, filters: Dict[str, Any]) -> list:
+ """Search using Google Custom Search API"""
+ # TODO: Implement Google Custom Search API
+ # This is a placeholder implementation
+ return [
+ {
+ "title": "Example Result",
+ "url": "https://example.com",
+ "snippet": "Example search result snippet",
+ "source": "google"
+ }
+ ]
+
+ async def _bing_search(self, query: str, max_results: int, filters: Dict[str, Any]) -> list:
+ """Search using Bing Web Search API"""
+ # TODO: Implement Bing Web Search API
+ # This is a placeholder implementation
+ return [
+ {
+ "title": "Example Result",
+ "url": "https://example.com",
+ "snippet": "Example search result snippet",
+ "source": "bing"
+ }
+ ]
+
+ async def _crawl_page(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Crawl web pages"""
+ try:
+ url = parameters["url"]
+ depth = parameters.get("depth", 1)
+ follow_links = parameters.get("followLinks", False)
+ include_images = parameters.get("includeImages", False)
+ respect_robots = parameters.get("respectRobots", True)
+
+ # Check robots.txt if required
+ if respect_robots:
+ if not await self._check_robots_txt(url):
+ return self._create_result(
+ success=False,
+ data={"error": "Crawling not allowed by robots.txt"}
+ )
+
+ # Crawl the page
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as response:
+ if response.status == 200:
+ html = await response.text()
+ soup = BeautifulSoup(html, 'html.parser')
+
+ # Extract basic information
+ result = {
+ "url": url,
+ "title": soup.title.string if soup.title else None,
+ "description": self._get_meta_description(soup),
+ "links": [],
+ "images": [] if include_images else None,
+ "text": soup.get_text(strip=True),
+ "crawled": datetime.now(UTC).isoformat()
+ }
+
+ # Extract links if followLinks is True
+ if follow_links:
+ base_url = url
+ for link in soup.find_all('a'):
+ href = link.get('href')
+ if href:
+ absolute_url = urljoin(base_url, href)
+ if self._is_valid_url(absolute_url):
+ result["links"].append({
+ "url": absolute_url,
+ "text": link.get_text(strip=True)
+ })
+
+ # Extract images if includeImages is True
+ if include_images:
+ for img in soup.find_all('img'):
+ src = img.get('src')
+ if src:
+ absolute_src = urljoin(url, src)
+ result["images"].append({
+ "url": absolute_src,
+ "alt": img.get('alt', ''),
+ "title": img.get('title', '')
+ })
+
+ return self._create_result(
+ success=True,
+ data=result
+ )
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Failed to fetch URL: {response.status}"}
+ )
+ except Exception as e:
+ logger.error(f"Error crawling page: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Crawl failed: {str(e)}"}
+ )
+
+ async def _extract_content(self, parameters: Dict[str, Any]) -> MethodResult:
+ """Extract content from web page"""
+ try:
+ url = parameters["url"]
+ selectors = parameters.get("selectors")
+ format = parameters.get("format", "text")
+ include_metadata = parameters.get("includeMetadata", False)
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(url) as response:
+ if response.status == 200:
+ html = await response.text()
+ soup = BeautifulSoup(html, 'html.parser')
+
+ # Extract content based on selectors
+ content = {}
+ if selectors:
+ for selector in selectors:
+ elements = soup.select(selector)
+ content[selector] = [elem.get_text() for elem in elements]
+ else:
+ # Default extraction
+ content = {
+ "title": soup.title.string if soup.title else None,
+ "text": soup.get_text(strip=True),
+ "links": [a.get('href') for a in soup.find_all('a')]
+ }
+
+ # Add metadata if requested
+ if include_metadata:
+ content["metadata"] = {
+ "url": url,
+ "crawled": datetime.now(UTC).isoformat(),
+ "language": self._detect_language(soup),
+ "wordCount": len(content["text"].split()),
+ "linksCount": len(content["links"])
+ }
+
+ return self._create_result(
+ success=True,
+ data={
+ "url": url,
+ "content": content
+ }
+ )
+ else:
+ return self._create_result(
+ success=False,
+ data={"error": f"Failed to fetch URL: {response.status}"}
+ )
+ except Exception as e:
+ logger.error(f"Error extracting content: {e}")
+ return self._create_result(
+ success=False,
+ data={"error": f"Extraction failed: {str(e)}"}
+ )
+
+ def _get_meta_description(self, soup: BeautifulSoup) -> Optional[str]:
+ """Extract meta description from HTML"""
+ meta_desc = soup.find('meta', attrs={'name': 'description'})
+ if meta_desc:
+ return meta_desc.get('content')
+ return None
+
+ def _is_valid_url(self, url: str) -> bool:
+ """Check if URL is valid"""
+ try:
+ result = urlparse(url)
+ return all([result.scheme, result.netloc])
+ except:
+ return False
+
+ async def _check_robots_txt(self, url: str) -> bool:
+ """Check if URL is allowed by robots.txt"""
+ try:
+ parsed_url = urlparse(url)
+ robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt"
+
+ async with aiohttp.ClientSession() as session:
+ async with session.get(robots_url, headers={"User-Agent": self.userAgent}, timeout=self.timeout) as response:
+ if response.status == 200:
+ robots_content = await response.text()
+
+ # Parse robots.txt content
+ user_agent = "*" # Default to all user agents
+ disallow_paths = []
+
+ for line in robots_content.splitlines():
+ line = line.strip().lower()
+ if line.startswith("user-agent:"):
+ user_agent = line[11:].strip()
+ elif line.startswith("disallow:") and user_agent in ["*", self.userAgent.lower()]:
+ path = line[9:].strip()
+ if path:
+ disallow_paths.append(path)
+
+ # Check if URL path is disallowed
+ url_path = parsed_url.path
+ for disallow_path in disallow_paths:
+ if url_path.startswith(disallow_path):
+ return False
+
+ return True
+ else:
+ # If robots.txt doesn't exist, assume crawling is allowed
+ return True
+
+ except Exception as e:
+ logger.warning(f"Error checking robots.txt for {url}: {str(e)}")
+ # If there's an error, assume crawling is allowed
+ return True
+
+ def _detect_language(self, soup: BeautifulSoup) -> str:
+ """Detect page language"""
+ try:
+ # Try to get language from HTML lang attribute
+ if soup.html and soup.html.get('lang'):
+ return soup.html.get('lang')
+
+ # Try to get language from meta tag
+ meta_lang = soup.find('meta', attrs={'http-equiv': 'content-language'})
+ if meta_lang:
+ return meta_lang.get('content', 'en')
+
+ # Try to get language from meta charset
+ meta_charset = soup.find('meta', attrs={'charset': True})
+ if meta_charset:
+ charset = meta_charset.get('charset', '').lower()
+ if 'utf-8' in charset:
+ return 'en' # Default to English for UTF-8
+
+ # Try to detect language from content
+ # This is a simple heuristic based on common words
+ text = soup.get_text().lower()
+ common_words = {
+ 'en': ['the', 'and', 'of', 'to', 'in', 'is', 'that', 'for', 'it', 'with'],
+ 'es': ['el', 'la', 'los', 'las', 'de', 'y', 'en', 'que', 'por', 'con'],
+ 'fr': ['le', 'la', 'les', 'de', 'et', 'en', 'que', 'pour', 'avec', 'dans'],
+ 'de': ['der', 'die', 'das', 'und', 'in', 'den', 'von', 'zu', 'für', 'mit']
+ }
+
+ word_counts = {lang: sum(1 for word in words if f' {word} ' in f' {text} ')
+ for lang, words in common_words.items()}
+
+ if word_counts:
+ return max(word_counts.items(), key=lambda x: x[1])[0]
+
+ return 'en' # Default to English if no language detected
+
+ except Exception as e:
+ logger.warning(f"Error detecting language: {str(e)}")
+ return 'en' # Default to English on error
\ No newline at end of file
diff --git a/modules/workflow/agentBase.py b/modules/workflow/agentBase.py
deleted file mode 100644
index 0c79681a..00000000
--- a/modules/workflow/agentBase.py
+++ /dev/null
@@ -1,214 +0,0 @@
-"""
-Agent Base Module.
-Provides the base class for all chat agents.
-Defines the standardized interface for task processing.
-"""
-
-import os
-import logging
-import uuid
-from datetime import datetime, UTC
-from typing import Dict, Any, List, Optional
-from modules.shared.mimeUtils import isTextMimeType, determineContentEncoding
-from modules.interfaces.serviceChatModel import ChatContent, Task, AgentResponse, ChatMessage
-
-logger = logging.getLogger(__name__)
-
-class AgentBase:
- """
- Base class for all chat agents.
- Defines the standardized interface for task processing.
- """
-
- def __init__(self):
- """Initialize the base agent."""
- self.name = "base"
- self.label = "Base Agent"
- self.description = "Base agent functionality"
- self.capabilities = []
- self.service = None
-
- def setService(self, service):
- """
- Set the service container reference and validate required interfaces.
-
- Args:
- service: The service container with required interfaces
- """
- if not service:
- logger.warning("Attempted to set null service container")
- return False
-
- # Validate required interfaces
- required_interfaces = ['base', 'msft', 'google']
- missing_interfaces = []
- for interface in required_interfaces:
- if not hasattr(service, interface):
- missing_interfaces.append(interface)
-
- if missing_interfaces:
- logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}")
- return False
-
- self.service = service
- return True
-
- def getAgentInfo(self) -> Dict[str, Any]:
- """
- Return standardized information about the agent's capabilities.
-
- Returns:
- Dictionary with name, description, and capabilities
- """
- return {
- "name": self.name,
- "label": self.label,
- "description": self.description,
- "capabilities": self.capabilities
- }
-
- async def execute(self, task: Task) -> AgentResponse:
- """
- Execute a task and return the response.
- This method must be implemented by all concrete agent classes.
-
- Args:
- task: Task object containing all necessary information
-
- Returns:
- AgentResponse object with execution results
- """
- # Validate service manager
- if not self.service:
- logger.error("Service container not initialized")
- return AgentResponse(
- success=False,
- message=ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=task.workflowId,
- agentName=self.name,
- message="Error: Service container not initialized",
- role="system",
- status="error",
- sequenceNr=0,
- startedAt=datetime.now(UTC).isoformat(),
- finishedAt=datetime.now(UTC).isoformat(),
- success=False
- ),
- performance={},
- progress=0.0
- )
-
- try:
- # Process the task using the concrete implementation
- result = await self.processTask(task)
-
- # Create response message
- message = ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=task.workflowId,
- agentName=self.name,
- message=result.get("feedback", ""),
- role="assistant",
- status="completed",
- sequenceNr=0,
- startedAt=datetime.now(UTC).isoformat(),
- finishedAt=datetime.now(UTC).isoformat(),
- success=True
- )
-
- # Create response with performance metrics
- return AgentResponse(
- success=True,
- message=message,
- performance=result.get("performance", {}),
- progress=result.get("progress", 100.0)
- )
-
- except Exception as e:
- logger.error(f"Error processing task: {str(e)}", exc_info=True)
- return AgentResponse(
- success=False,
- message=ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=task.workflowId,
- agentName=self.name,
- message=f"Error processing task: {str(e)}",
- role="system",
- status="error",
- sequenceNr=0,
- startedAt=datetime.now(UTC).isoformat(),
- finishedAt=datetime.now(UTC).isoformat(),
- success=False
- ),
- performance={},
- progress=0.0
- )
-
- async def processTask(self, task: Task) -> Dict[str, Any]:
- """
- Process a task and return the results.
- This method must be implemented by all concrete agent classes.
-
- Args:
- task: Task object containing all necessary information
-
- Returns:
- Dictionary containing:
- - feedback: Text response explaining what the agent did
- - performance: Optional performance metrics
- - progress: Task progress (0-100)
- """
- raise NotImplementedError("processTask must be implemented by concrete agent classes")
-
- def determineBase64EncodingFlag(self, filename: str, content: Any, mimeType: str = None) -> bool:
- """
- Determine if content should be base64 encoded.
-
- Args:
- filename: Name of the file
- content: Content to check
- mimeType: Optional MIME type
-
- Returns:
- Boolean indicating if content should be base64 encoded
- """
- return determineContentEncoding(filename, content, mimeType)
-
- def isTextMimeType(self, mimeType: str) -> bool:
- """
- Check if MIME type is text-based.
-
- Args:
- mimeType: MIME type to check
-
- Returns:
- Boolean indicating if MIME type is text-based
- """
- return isTextMimeType(mimeType)
-
- def formatAgentDocumentOutput(self, label: str, content: str, contentType: str, base64Encoded: bool = False) -> ChatContent:
- """
- Format agent document output using ChatContent model.
-
- Args:
- label: Document label/filename
- content: Document content
- contentType: MIME type of content
- base64Encoded: Whether content is base64 encoded
-
- Returns:
- ChatContent object with the following attributes:
- - sequenceNr: Sequence number (defaults to 1)
- - name: Document label/filename
- - mimeType: MIME type of content
- - data: Actual content
- - metadata: Additional metadata including base64Encoded flag
- """
- return ChatContent(
- sequenceNr=1,
- name=label,
- mimeType=contentType,
- data=content,
- metadata={"base64Encoded": base64Encoded}
- )
\ No newline at end of file
diff --git a/modules/workflow/agentManager.py b/modules/workflow/agentManager.py
deleted file mode 100644
index 91bae403..00000000
--- a/modules/workflow/agentManager.py
+++ /dev/null
@@ -1,212 +0,0 @@
-"""
-Agent Manager Module for managing agent operations and execution.
-"""
-
-import os
-import logging
-import importlib
-from typing import Dict, Any, List, Optional, Tuple
-from datetime import datetime, UTC
-import uuid
-from modules.interfaces.serviceChatModel import (
- ChatMessage, ChatDocument, UserInputRequest, ChatWorkflow, AgentResponse
-)
-logger = logging.getLogger(__name__)
-
-class AgentManager:
- """Manager for agent operations and execution."""
-
- _instance = None
-
- @classmethod
- def getInstance(cls):
- """Return a singleton instance of the agent manager."""
- if cls._instance is None:
- cls._instance = cls()
- return cls._instance
-
- # Internal Methods
-
- def __init__(self):
- """Initialize the agent manager."""
- if AgentManager._instance is not None:
- raise RuntimeError("Singleton instance already exists - use getInstance()")
-
- self.service = None
- self.agents = {} # Dictionary to store agent instances
- self._loadAgents() # Load agents on initialization
-
- def _loadAgents(self):
- """Load all available agents from modules dynamically."""
- logger.info("Loading agent modules...")
-
- # Get the agents directory path
- agentDir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "agents")
-
- # Search for agent modules
- agentModules = []
- for filename in os.listdir(agentDir):
- if filename.startswith("agent") and filename.endswith(".py"):
- agentModules.append(filename[:-3]) # Remove .py extension
-
- if not agentModules:
- logger.warning("No agent modules found in directory: %s", agentDir)
- return
-
- logger.info(f"Found {len(agentModules)} agent modules: {', '.join(agentModules)}")
-
- # Load each agent module
- for moduleName in agentModules:
- try:
- # Import the module
- module = importlib.import_module(f"modules.agents.{moduleName}")
-
- # Extract agent name from module name
- agentName = moduleName.split("agent")[-1]
- className = f"Agent{agentName}"
- getterName = f"getAgent{agentName}"
-
- agent = None
-
- # Try to get the agent via the getter function first
- if hasattr(module, getterName):
- getterFunc = getattr(module, getterName)
- agent = getterFunc()
- logger.info(f"Agent '{agent.name}' loaded via {getterName}()")
-
- # If no getter, try to instantiate the agent class directly
- elif hasattr(module, className):
- agentClass = getattr(module, className)
- agent = agentClass()
- logger.info(f"Agent '{agent.name}' directly instantiated from {className}")
-
- if agent:
- # Register the agent
- if self._registerAgent(agent):
- logger.info(f"Successfully registered agent: {agent.name}")
- else:
- logger.error(f"Failed to register agent from module: {moduleName}")
- else:
- logger.warning(f"No agent class or getter function found in module: {moduleName}")
-
- except ImportError as e:
- logger.error(f"Failed to import module {moduleName}: {str(e)}")
- except Exception as e:
- logger.error(f"Error loading agent from module {moduleName}: {str(e)}")
-
- def _registerAgent(self, agent: Any):
- """Register a new agent with the manager."""
- if not hasattr(agent, 'name'):
- logger.error("Agent must have a name attribute")
- return False
-
- self.agents[agent.name] = agent
- if self.service and hasattr(agent, 'setService'):
- agent.setService(self.service)
-
- return True
-
- # Public Methods
-
- def initialize(self, service: Any):
- """Initialize the manager with service reference."""
- # Store service reference
- self.service = service
-
- # Initialize agents with service
- for agent in self.agents.values():
- if hasattr(agent, 'setService'):
- agent.setService(service)
-
- return True
-
- def getAgent(self, agentIdentifier: str) -> Optional[Any]:
- """
- Get an agent instance by its identifier.
-
- Args:
- agentIdentifier: Name or identifier of the agent
-
- Returns:
- Agent instance if found, None otherwise
- """
- agent = self.agents.get(agentIdentifier)
- if not agent:
- logger.warning(f"Agent '{agentIdentifier}' not found")
- return agent
-
- def getAllAgents(self) -> Dict[str, Any]:
- """
- Get all registered agents.
-
- Returns:
- Dictionary mapping agent names to agent instances
- """
- return self.agents.copy()
-
- def getAgentInfos(self) -> List[Dict[str, Any]]:
- """Get information about all registered agents."""
- return [
- {
- 'name': agent.name,
- 'description': getattr(agent, 'description', ''),
- 'capabilities': getattr(agent, 'capabilities', []),
- 'inputTypes': getattr(agent, 'inputTypes', []),
- 'outputTypes': getattr(agent, 'outputTypes', [])
- }
- for agent in self.agents.values()
- ]
-
- async def executeAgent(self, handover: Any) -> AgentResponse:
- """
- Execute an agent with the given handover.
-
- Args:
- handover: Handover object containing agent execution context
-
- Returns:
- AgentResponse object with execution results
- """
- try:
- # Get agent instance
- agent = self.agents.get(handover.currentAgent)
- if not agent:
- raise ValueError(f"Agent {handover.currentAgent} not found")
-
- # Execute agent
- response = await agent.execute(handover)
-
- # Save output files if any
- if response.message and response.message.documents:
- self.service.document['agentOutputFilesSave'](handover, response.message.documents)
-
- return response
-
- except Exception as e:
- logger.error(f"Error executing agent {handover.currentAgent}: {str(e)}")
-
- # Create error message
- errorMessage = ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=handover.workflowId,
- agentName=handover.currentAgent,
- message=f"Error executing agent: {str(e)}",
- role="system",
- status="error",
- sequenceNr=0,
- startedAt=handover.startedAt,
- finishedAt=datetime.now(UTC).isoformat(),
- success=False
- )
-
- return AgentResponse(
- success=False,
- message=errorMessage,
- error=str(e),
- performance={},
- progress=0.0
- )
-
-# Singleton factory for the agent manager
-def getAgentManager():
- return AgentManager.getInstance()
\ No newline at end of file
diff --git a/modules/workflow/chatManager.py b/modules/workflow/chatManager.py
deleted file mode 100644
index bb3e1262..00000000
--- a/modules/workflow/chatManager.py
+++ /dev/null
@@ -1,617 +0,0 @@
-"""
-Chat Manager Module for managing chat workflows and agent handovers.
-"""
-
-import logging
-from typing import Dict, Any, List, Optional, Union
-from datetime import datetime, UTC
-import uuid
-import json
-from dataclasses import dataclass
-from modules.interfaces.serviceChatModel import (
- ChatLog, ChatMessage, ChatDocument, UserInputRequest, ChatWorkflow,
- AgentHandover
-)
-from modules.workflow.agentManager import getAgentManager
-from modules.workflow.documentManager import getDocumentManager
-
-logger = logging.getLogger(__name__)
-
-class ChatManager:
- """Manager for chat workflows and agent handovers."""
-
- _instance = None
-
- @classmethod
- def getInstance(cls):
- """Return a singleton instance of the chat manager."""
- if cls._instance is None:
- cls._instance = cls()
- return cls._instance
-
- # Core functions
-
- def __init__(self):
- """Initialize the chat manager."""
- if ChatManager._instance is not None:
- raise RuntimeError("Singleton instance already exists - use getInstance()")
-
- self.service = None
- self.agentManager = getAgentManager()
- self.documentManager = getDocumentManager()
-
- def initialize(self, workflow: ChatWorkflow):
- """
- Initialize the manager with an optional workflow object.
-
- Args:
- workflow: Optional ChatWorkflow object to initialize with
- """
- # Initialize managers
- self.agentManager.initialize(self.service)
- self.documentManager.initialize(self.service)
-
- # Add basic references to service
- self.service.workflow = workflow
- self.service.logAdd = self.logAdd
-
- self.service.user = {
- 'id': None,
- 'name': None,
- 'language': 'en'
- }
- self.service.functions = {
- 'forEach': lambda items, action: [action(item) for item in items],
- 'while': lambda condition, action: [action() for _ in iter(lambda: condition(), False)]
- }
- self.service.model = {
- 'callAiBasic': self._callAiBasic,
- 'callAiComplex': self._callAiComplex,
- 'callAiImage': self._callAiImage
- }
-
- # Initialize document operations
- self.service.document = {
- 'extract': self.documentManager.extractContent,
- 'convertFileRefToFileId': self.documentManager.convertFileRefToId,
- 'convertFileIdToFileRef': self.documentManager.convertFileIdToRef,
- 'convertDataFormat': self.documentManager.convertDataFormat,
- 'agentInputFilesCreate': self.documentManager.createAgentInputFileList,
- 'agentOutputFilesSave': self.documentManager.saveAgentOutputFiles
- }
-
- # Initialize data access
- from modules.workflow.dataAccessFunctions import get_data_access
- self.service.data = get_data_access().to_service_object()
-
- return True
-
- def createInitialHandover(self, userInput: UserInputRequest) -> AgentHandover:
- """
- Create the initial handover object from user input.
-
- Args:
- userInput: User input request
-
- Returns:
- Initial handover object
- """
- try:
- # Create initial handover
- handover = AgentHandover(
- promptUserInitial=userInput.message,
- documentsUserInitial=userInput.listFileId or [],
- startedAt=datetime.now(UTC).isoformat()
- )
-
- # Process user input documents
- if handover.documentsUserInitial:
- handover.documentsInput = handover.documentsUserInitial
-
- # Set initial prompt for next agent
- handover.promptForNextAgent = handover.promptUserInitial
-
- return handover
-
- except Exception as e:
- logger.error(f"Error creating initial handover: {str(e)}")
- return AgentHandover(status="failed", error=str(e))
-
- async def defineNextHandover(self, currentHandover: AgentHandover) -> Optional[AgentHandover]:
- """
- Define the next handover object for agent transition.
-
- Args:
- currentHandover: Current handover object
-
- Returns:
- Next handover object or None if no next agent
- """
- try:
- # Get available agents
- availableAgents = self.agentManager.getAgentInfos()
- if not availableAgents:
- logger.warning("No available agents found")
- return None
-
- # Create next handover object
- nextHandover = AgentHandover(
- promptUserInitial=currentHandover.promptUserInitial,
- documentsUserInitial=currentHandover.documentsUserInitial,
- startedAt=datetime.now(UTC).isoformat()
- )
-
- # If this is the first handover, use initial documents
- if not currentHandover.promptFromFinishedAgent:
- nextHandover.documentsInput = currentHandover.documentsUserInitial
- nextHandover.promptForNextAgent = currentHandover.promptUserInitial
- else:
- # Use output documents from previous agent
- nextHandover.documentsInput = currentHandover.documentsOutput
- nextHandover.promptForNextAgent = currentHandover.promptFromFinishedAgent
-
- # Select next agent based on available agents and current state
- nextAgent = await self._selectNextAgent(availableAgents, nextHandover)
- if not nextAgent:
- logger.info("No suitable next agent found")
- return None
-
- nextHandover.nextAgent = nextAgent['name']
- return nextHandover
-
- except Exception as e:
- logger.error(f"Error defining next handover: {str(e)}")
- return None
-
- async def _selectNextAgent(self, availableAgents: List[Dict[str, Any]], handover: AgentHandover) -> Optional[Dict[str, Any]]:
- """
- Select the next agent using AI analysis of the current state and requirements.
-
- Args:
- availableAgents: List of available agents
- handover: Current handover object
-
- Returns:
- Selected agent or None if no suitable agent
- """
- try:
- if not availableAgents:
- logger.warning("No available agents found")
- return None
-
- # Get current workflow state
- workflow = self.service.workflow
- if not workflow:
- logger.error("No workflow context available")
- return None
-
- # Detect user language if not already set
- if not workflow.userLanguage:
- workflow.userLanguage = await self._detectUserLanguage(handover.promptUserInitial)
-
- # Get workflow summary for context
- workflow_summary = await self.workflowSummarize(ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=workflow.id,
- role="user",
- message=handover.promptUserInitial
- ))
-
- # Prepare context for AI analysis
- context = {
- "current_state": {
- "previous_agent": handover.currentAgent,
- "status": handover.status,
- "error": handover.error,
- "user_language": workflow.userLanguage,
- "input_documents": handover.documentsInput or [],
- "output_documents": handover.documentsOutput or [],
- "required_capabilities": handover.requiredCapabilities or []
- },
- "conversation_history": workflow_summary,
- "available_agents": [
- {
- "name": agent.get("name", ""),
- "capabilities": agent.get("capabilities", {}),
- "description": agent.get("description", "")
- }
- for agent in availableAgents
- ]
- }
-
- # Create prompt for AI to analyze and select next agent
- prompt = f"""
- Analyze the current workflow state, conversation history, and available agents to determine the most suitable next agent.
- Consider the following factors:
- 1. Previous agent's status and any errors
- 2. Required capabilities for the task
- 3. Document type compatibility
- 4. Language requirements
- 5. Agent's capabilities and specializations
- 6. Conversation history and context
-
- Current State:
- {json.dumps(context['current_state'], indent=2)}
-
- Conversation History:
- {context['conversation_history']}
-
- Available Agents:
- {json.dumps(context['available_agents'], indent=2)}
-
- Return a JSON object with the following structure:
- {{
- "selected_agent": "name of the most suitable agent",
- "reasoning": "brief explanation of why this agent was selected",
- "required_capabilities": ["list", "of", "required", "capabilities"],
- "potential_risks": ["list", "of", "potential", "issues"],
- "task": {{
- "description": "clear description of what the agent needs to do",
- "input_format": {{
- "documents": ["list", "of", "required", "input", "documents"],
- "data": ["list", "of", "required", "data", "fields"]
- }},
- "output_format": {{
- "documents": ["list", "of", "expected", "output", "documents"],
- "data": ["list", "of", "expected", "output", "fields"]
- }},
- "requirements": [
- "list of specific requirements",
- "format requirements",
- "quality requirements"
- ],
- "constraints": [
- "list of constraints",
- "time limits",
- "resource limits"
- ]
- }},
- "prompt_template": "template for the agent's prompt with placeholders for dynamic content"
- }}
-
- Format your response as a valid JSON object.
- """
-
- # Get AI's analysis and selection
- response = await self._callAiComplex(prompt)
-
- try:
- analysis = json.loads(response)
- selected_agent_name = analysis.get('selected_agent')
-
- # Find the selected agent in available agents
- selected_agent = next(
- (agent for agent in availableAgents if agent.get('name') == selected_agent_name),
- None
- )
-
- if selected_agent:
- logger.info(f"AI selected agent {selected_agent_name}: {analysis.get('reasoning')}")
- # Update handover with AI's analysis
- handover.requiredCapabilities = analysis.get('required_capabilities', [])
- handover.analysis = {
- 'reasoning': analysis.get('reasoning'),
- 'potential_risks': analysis.get('potential_risks', []),
- 'task': analysis.get('task', {}),
- 'prompt_template': analysis.get('prompt_template', '')
- }
- return selected_agent
- else:
- logger.warning(f"AI selected agent {selected_agent_name} not found in available agents")
- return None
-
- except json.JSONDecodeError as e:
- logger.error(f"Error parsing AI response: {str(e)}")
- return None
-
- except Exception as e:
- logger.error(f"Error selecting next agent: {str(e)}")
- return None
-
- async def processNextAgent(self, handover: AgentHandover) -> AgentHandover:
- """
- Process the next agent in the workflow.
-
- Args:
- handover: Current handover object
-
- Returns:
- Updated handover object
- """
- try:
- # Get agent instance
- agent = self.agentManager.getAgent(handover.nextAgent)
- if not agent:
- handover.update_status("failed", f"Agent {handover.nextAgent} not found")
- return handover
-
- # Set current agent
- handover.currentAgent = handover.nextAgent
- handover.nextAgent = None
-
- # Execute agent
- response = await agent.execute(handover)
-
- # Update handover with results
- if response.success:
- handover.update_status("success")
- handover.documentsOutput = response.message.documents if response.message else []
- handover.promptFromFinishedAgent = response.message.message if response.message else ""
- else:
- handover.update_status("failed", response.error)
-
- return handover
-
- except Exception as e:
- logger.error(f"Error processing next agent: {str(e)}")
- handover.update_status("failed", str(e))
- return handover
-
- # Agent functions
-
- async def _callAiBasic(self, prompt: str, context: Dict[str, Any] = None) -> str:
- """Call basic AI model."""
- try:
- response = await self.service.base.callAi(prompt, context or {}, model="aiBase")
- return response
- except Exception as e:
- logger.error(f"Error calling basic AI: {str(e)}")
- return ""
-
- async def _callAiComplex(self, prompt: str, context: Dict[str, Any] = None) -> str:
- """Call complex AI model."""
- try:
- response = await self.service.base.callAi(prompt, context or {}, model="aiComplex")
- return response
- except Exception as e:
- logger.error(f"Error calling complex AI: {str(e)}")
- return ""
-
- async def _callAiImage(self, prompt: str, context: Dict[str, Any] = None) -> str:
- """Call image AI model."""
- try:
- response = await self.service.base.callAi(prompt, context or {}, model="aiImage")
- return response
- except Exception as e:
- logger.error(f"Error calling image AI: {str(e)}")
- return ""
-
- def logAdd(self, message: str, level: str = "info",
- progress: Optional[int] = None) -> str:
- """
- Add a log entry to the workflow.
-
- Args:
- message: Log message
- level: Log level (info, warning, error)
- progress: Optional progress percentage
-
- Returns:
- str: ID of the created log entry
- """
- workflow = self.service.workflow
- try:
- # Generate log ID
- logId = str(uuid.uuid4())
-
- # Create log entry
- logEntry = ChatLog(
- id=logId,
- workflowId=workflow.id,
- message=message,
- level=level,
- progress=progress,
- timestamp=datetime.now().isoformat()
- )
-
- # Add to workflow logs
- workflow.logs.append(logEntry)
-
- # Also log to Python logger
- logLevel = getattr(logging, level.upper())
- logger.log(logLevel, f"[Workflow {workflow.id}] {message}")
-
- # Save to database
- self.chatManager.saveWorkflowLog(workflow.id, logEntry.to_dict())
-
- return logId
-
- except Exception as e:
- logger.error(f"Error adding log entry: {str(e)}")
- return ""
-
- async def chatMessageToWorkflow(self, role: str, agent: Union[str, Dict[str, Any]], chatMessage: UserInputRequest) -> ChatMessage:
- """
- Integrates chat message input into a Message object including files with complete contents.
-
- Args:
- role: Role of the message sender (e.g., 'user', 'assistant')
- agent: Agent name or configuration
- chatMessage: UserInputRequest object containing message data and file references
-
- Returns:
- ChatMessage object with complete file contents
- """
- try:
- # Process additional files with complete contents
- additionalFileIds = chatMessage.listFileId or []
- additionalFiles = await self.processFileIds(additionalFileIds)
-
- # Create message object
- message = ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=self.service.workflow.id,
- role=role,
- agentName=agent if isinstance(agent, str) else agent.get("name", ""),
- message=chatMessage.message,
- documents=additionalFiles,
- status="completed",
- startedAt=datetime.now().isoformat()
- )
-
- return message
-
- except Exception as e:
- logger.error(f"Error creating workflow message: {str(e)}")
- raise
-
- async def sendFinalMessage(self, handover: AgentHandover) -> ChatMessage:
- """
- Send final message to user with workflow results.
-
- Args:
- handover: Final handover object
-
- Returns:
- Final message to user
- """
- try:
- # Create final message content from handover
- messageContent = handover.promptFromFinishedAgent
- if handover.status == "failed":
- messageContent = f"Workflow failed: {handover.error}"
-
- # Add summary of generated documents
- if handover.documentsOutput:
- messageContent += "\n\nGenerated documents:"
- for doc in handover.documentsOutput:
- messageContent += f"\n- {doc.get('name', 'Unknown')}"
-
- # Create message object
- finalMessage = ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=self.service.workflow.id,
- agentName="Workflow Manager",
- message=messageContent,
- role="assistant",
- status="completed",
- sequenceNr=0,
- startedAt=datetime.now(UTC).isoformat(),
- finishedAt=datetime.now(UTC).isoformat(),
- success=handover.status == "success",
- documents=handover.documentsOutput
- )
-
- return finalMessage
-
- except Exception as e:
- logger.error(f"Error sending final message: {str(e)}")
- return ChatMessage(
- id=str(uuid.uuid4()),
- workflowId=self.service.workflow.id,
- agentName="Workflow Manager",
- message=f"Error in workflow: {str(e)}",
- role="system",
- status="error",
- sequenceNr=0,
- startedAt=datetime.now(UTC).isoformat(),
- finishedAt=datetime.now(UTC).isoformat(),
- success=False
- )
-
- async def workflowSummarize(self, messageUser: ChatMessage) -> str:
- """
- Creates a summary of the workflow without the current user message.
-
- Args:
- messageUser: Current user message
-
- Returns:
- Summary of the workflow
- """
- if not self.service.workflow or "messages" not in self.service.workflow or not self.service.workflow["messages"]:
- return "" # First message
-
- # Go through messages in chronological order
- messages = sorted(self.service.workflow["messages"], key=lambda m: m.get("sequenceNo", 0), reverse=False)
-
- summaryParts = []
- for message in messages:
- if message["id"] != messageUser["id"]:
- messageSummary = await self.messageSummarize(message)
- summaryParts.append(messageSummary)
-
- return "\n\n".join(summaryParts)
-
- async def messageSummarize(self, message: ChatMessage) -> str:
- """
- Creates a summary of a message including its documents.
-
- Args:
- message: Message to summarize
-
- Returns:
- Summary of the message
- """
- role = message.role
- agentName = message.agentName
- content = message.content
-
- try:
- # Use the serviceBase for language-aware AI calls
- prompt = f"Create a very concise summary (2-3 sentences, maximum 300 characters) of the following message:\n\n{content}"
- contentSummary = await self._callAiBasic(prompt)
- except Exception as e:
- logger.error(f"Error creating summary: {str(e)}")
- contentSummary = content[:200] + "..."
-
- # Summarize documents
- docsSummary = ""
- if "documents" in message and message["documents"]:
- docsList = []
- for i, doc in enumerate(message["documents"]):
- docName = self.getFilename(doc)
- docsList.append(docName)
- if docsList:
- docsSummary = "\nDocuments:" + "\n- ".join(docsList)
-
- return f"[{role} {agentName}]: {contentSummary}{docsSummary}"
-
- def getFilename(self, document: ChatDocument) -> str:
- """
- Gets the filename from a document by combining name and extension.
-
- Args:
- document: Document object
-
- Returns:
- Filename with extension
- """
- name = document.name
- ext = document.ext
- if ext:
- return f"{name}.{ext}"
- return name
-
- async def _detectUserLanguage(self, text: str) -> str:
- """
- Detects the language of user input using AI.
-
- Args:
- text: User input text to analyze
-
- Returns:
- Language code (e.g., 'en', 'de', 'fr')
- """
- try:
- # Use basic AI model for language detection
- prompt = f"""
- Analyze the following text and identify its language.
- Return only the ISO 639-1 language code (e.g., 'en' for English, 'de' for German).
-
- Text: {text}
- """
- response = await self._callAiBasic(prompt)
- # Clean and validate response
- lang_code = response.strip().lower()
- # Basic validation of common language codes
- valid_codes = {'en', 'de', 'fr', 'es', 'it', 'pt', 'nl', 'ru', 'zh', 'ja', 'ko'}
- return lang_code if lang_code in valid_codes else 'en'
- except Exception as e:
- logger.error(f"Error detecting language: {str(e)}")
- return 'en' # Default to English on error
-
-
-# Singleton factory for the chat manager
-def getChatManager():
- return ChatManager.getInstance()
\ No newline at end of file
diff --git a/modules/workflow/dataAccessFunctions.py b/modules/workflow/dataAccessFunctions.py
deleted file mode 100644
index 87448544..00000000
--- a/modules/workflow/dataAccessFunctions.py
+++ /dev/null
@@ -1,273 +0,0 @@
-"""
-Data access functions for Microsoft and Google services.
-Provides standardized interfaces for SharePoint, Outlook, and other services.
-"""
-
-from typing import List, Dict, Any, Optional, Union
-from datetime import datetime
-from pydantic import BaseModel, Field
-from enum import Enum
-
-class ServiceType(str, Enum):
- """Service types for data access"""
- MSFT = "msft"
- GOOGLE = "google"
-
-class FileRef(BaseModel):
- """Reference to a file in storage"""
- id: str
- name: str
- path: str
- url: Optional[str] = None
- size: Optional[int] = None
- lastModified: Optional[datetime] = None
-
-# SharePoint Functions
-class SharePointSearchParams(BaseModel):
- """Parameters for SharePoint search"""
- userName: str
- query: str
- site: Optional[str] = None
- folder: Optional[str] = None
- contentType: Optional[str] = None
- createdAfter: Optional[datetime] = None
- modifiedAfter: Optional[datetime] = None
- maxResults: Optional[int] = 100
-
-class SharePointFolderParams(BaseModel):
- """Parameters for SharePoint folder operations"""
- userName: str
- folderPattern: str
- site: Optional[str] = None
- recursive: bool = False
- includeFiles: bool = True
-
-class SharePointFileParams(BaseModel):
- """Parameters for SharePoint file operations"""
- userName: str
- fileName: str
- site: Optional[str] = None
- folder: Optional[str] = None
- content: Optional[bytes] = None
- contentType: Optional[str] = None
-
-async def Msft_Sharepoint_Search(params: SharePointSearchParams) -> List[Dict[str, Any]]:
- """Search SharePoint for files and folders matching criteria"""
- # Implementation would go here
- pass
-
-async def Msft_Sharepoint_GetFolders(params: SharePointFolderParams) -> Dict[str, Any]:
- """Get SharePoint folders matching pattern"""
- # Implementation would go here
- pass
-
-async def Msft_Sharepoint_GetFiles(params: SharePointFileParams) -> Dict[str, Any]:
- """Get SharePoint files matching pattern"""
- # Implementation would go here
- pass
-
-async def Msft_Sharepoint_GetFile(params: SharePointFileParams) -> Dict[str, Any]:
- """Get specific SharePoint file"""
- # Implementation would go here
- pass
-
-async def Msft_Sharepoint_PutFile(params: SharePointFileParams) -> FileRef:
- """Upload file to SharePoint"""
- # Implementation would go here
- pass
-
-# Outlook Mail Functions
-class OutlookMailParams(BaseModel):
- """Parameters for Outlook mail operations"""
- userName: str
- folder: Optional[str] = None
- messageId: Optional[str] = None
- subject: Optional[str] = None
- body: Optional[str] = None
- to: Optional[List[str]] = None
- cc: Optional[List[str]] = None
- bcc: Optional[List[str]] = None
- attachments: Optional[List[FileRef]] = None
- searchString: Optional[str] = None
- fromAddress: Optional[str] = None
- receivedAfter: Optional[datetime] = None
- maxResults: Optional[int] = 100
-
-async def Msft_Outlook_ReadMails(params: OutlookMailParams) -> List[Dict[str, Any]]:
- """Read multiple emails from Outlook"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_ReadMail(params: OutlookMailParams) -> Dict[str, Any]:
- """Read specific email from Outlook"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_DraftMail(params: OutlookMailParams) -> Dict[str, Any]:
- """Create draft email in Outlook"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_SendMail(params: OutlookMailParams) -> Dict[str, Any]:
- """Send email through Outlook"""
- # Implementation would go here
- pass
-
-# Outlook Calendar Functions
-class OutlookCalendarParams(BaseModel):
- """Parameters for Outlook calendar operations"""
- userName: str
- calendar: Optional[str] = None
- eventId: Optional[str] = None
- subject: Optional[str] = None
- body: Optional[str] = None
- startTime: Optional[datetime] = None
- endTime: Optional[datetime] = None
- location: Optional[str] = None
- organizer: Optional[str] = None
- attendees: Optional[List[str]] = None
- searchString: Optional[str] = None
- maxResults: Optional[int] = 100
-
-async def Msft_Outlook_ReadAppointments(params: OutlookCalendarParams) -> List[Dict[str, Any]]:
- """Read multiple calendar appointments"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_CreateAppointment(params: OutlookCalendarParams) -> Dict[str, Any]:
- """Create new calendar appointment"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_ReadAppointment(params: OutlookCalendarParams) -> Dict[str, Any]:
- """Read specific calendar appointment"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_UpdateAppointment(params: OutlookCalendarParams) -> Dict[str, Any]:
- """Update existing calendar appointment"""
- # Implementation would go here
- pass
-
-async def Msft_Outlook_DeleteAppointment(params: OutlookCalendarParams) -> bool:
- """Delete calendar appointment"""
- # Implementation would go here
- pass
-
-def get_data_access_functions() -> List[Dict[str, Any]]:
- """
- Dynamically generates a comprehensive list of all available data access functions
- with their parameters for use in agent prompts.
- """
- import inspect
- import sys
-
- functions = []
- current_module = sys.modules[__name__]
-
- # Get all functions in the module
- for name, obj in inspect.getmembers(current_module):
- # Check if it's a function and starts with Msft_ or Google_
- if inspect.isfunction(obj) and (name.startswith('Msft_') or name.startswith('Google_')):
- # Get function signature
- sig = inspect.signature(obj)
-
- # Get return type annotation
- return_type = obj.__annotations__.get('return', 'Any')
- if hasattr(return_type, '__origin__'):
- return_type = str(return_type)
-
- # Get parameter model class
- param_model = None
- for param in sig.parameters.values():
- if param.annotation.__module__ == __name__:
- param_model = param.annotation
- break
-
- # Determine authority from function name
- authority = ServiceType.MSFT if name.startswith('Msft_') else ServiceType.GOOGLE
-
- # Create function entry
- function_entry = {
- "name": name,
- "description": obj.__doc__ or "",
- "parameters": param_model.schema() if param_model else {},
- "return_type": str(return_type),
- "authority": authority
- }
-
- functions.append(function_entry)
-
- return functions
-
-class DataAccess:
- """Manages data access functions for different services"""
-
- def __init__(self):
- """Initialize the data access manager"""
- self.functions = get_data_access_functions()
- self._initialize_functions()
-
- def _initialize_functions(self):
- """Initialize function groups and metadata"""
- # Group functions by service type
- self.msft_functions = {}
- self.google_functions = {}
-
- for func in self.functions:
- func_name = func['name']
- # Get the actual function object
- func_obj = globals()[func_name]
-
- if func['authority'] == ServiceType.MSFT:
- self.msft_functions[func_name] = func_obj
- else:
- self.google_functions[func_name] = func_obj
-
- @property
- def msft(self) -> Dict[str, Any]:
- """Get Microsoft service functions and metadata"""
- return {
- 'functions': self.msft_functions,
- 'metadata': {
- 'name': 'Microsoft Services',
- 'description': 'Microsoft Office 365 and SharePoint services',
- 'functions': [f for f in self.functions if f['authority'] == ServiceType.MSFT]
- }
- }
-
- @property
- def google(self) -> Dict[str, Any]:
- """Get Google service functions and metadata"""
- return {
- 'functions': self.google_functions,
- 'metadata': {
- 'name': 'Google Services',
- 'description': 'Google Workspace services',
- 'functions': [f for f in self.functions if f['authority'] == ServiceType.GOOGLE]
- }
- }
-
- @property
- def utils(self) -> Dict[str, Any]:
- """Get utility functions for data access"""
- return {
- 'getAvailableFunctions': lambda: self.functions,
- 'getFunctionInfo': lambda name: next((f for f in self.functions if f['name'] == name), None),
- 'getServiceFunctions': lambda service_type: [f for f in self.functions if f['authority'] == service_type]
- }
-
- def to_service_object(self) -> Dict[str, Any]:
- """Convert to service object format"""
- return {
- 'msft': self.msft,
- 'google': self.google,
- 'utils': self.utils
- }
-
-def get_data_access() -> DataAccess:
- """Get a singleton instance of the data access manager"""
- if not hasattr(get_data_access, '_instance'):
- get_data_access._instance = DataAccess()
- return get_data_access._instance
-
diff --git a/modules/workflow/documentManager.py b/modules/workflow/documentManager.py
deleted file mode 100644
index 6e627c3a..00000000
--- a/modules/workflow/documentManager.py
+++ /dev/null
@@ -1,396 +0,0 @@
-"""
-Document Manager Module for handling document operations and content extraction.
-"""
-
-import logging
-from typing import Dict, Any, List, Optional
-from datetime import datetime
-from modules.interfaces.serviceChatModel import ChatDocument, ChatContent
-from modules.workflow.documentProcessor import getDocumentContents
-import uuid
-import json
-import base64
-
-logger = logging.getLogger(__name__)
-
-class DocumentManager:
- """Manager for document operations and content extraction."""
-
- _instance = None
-
- @classmethod
- def getInstance(cls):
- """Return a singleton instance of the document manager."""
- if cls._instance is None:
- cls._instance = cls()
- return cls._instance
-
- def __init__(self):
- """Initialize the document manager."""
- if DocumentManager._instance is not None:
- raise RuntimeError("Singleton instance already exists - use getInstance()")
-
- self.service = None
-
- def initialize(self, service=None):
- """Initialize or update the manager with service references."""
- if service:
- # Validate required interfaces
- required_interfaces = ['base', 'msft', 'google']
- missing_interfaces = []
- for interface in required_interfaces:
- if not hasattr(service, interface):
- missing_interfaces.append(interface)
-
- if missing_interfaces:
- logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}")
- return False
-
- self.service = service
- return True
-
- async def extractContent(self, fileId: str) -> Optional[ChatDocument]:
- """
- Extract content from a file.
-
- Args:
- fileId: ID of the file to extract content from
-
- Returns:
- ChatDocument object if successful, None otherwise
- """
- try:
- # Get file content
- fileContent = await self.getFileContent(fileId)
- if not fileContent:
- return None
-
- # Get file metadata
- fileMetadata = await self.getFileMetadata(fileId)
- if not fileMetadata:
- return None
-
- # Create ChatDocument
- return ChatDocument(
- id=str(uuid.uuid4()),
- fileId=fileId,
- filename=fileMetadata.get("name", "Unknown"),
- fileSize=fileMetadata.get("size", 0),
- content=fileContent.decode('utf-8', errors='ignore'),
- mimeType=fileMetadata.get("mimeType", "text/plain")
- )
- except Exception as e:
- logger.error(f"Error extracting content from file {fileId}: {str(e)}")
- return None
-
- async def getFileContent(self, fileId: str) -> Optional[bytes]:
- """Gets the content of a file."""
- try:
- return self.service.functions.getFileData(fileId)
- except Exception as e:
- logger.error(f"Error getting file content for {fileId}: {str(e)}")
- return None
-
- async def getFileMetadata(self, fileId: str) -> Optional[Dict[str, Any]]:
- """Gets the metadata of a file."""
- try:
- return self.service.functions.getFile(fileId)
- except Exception as e:
- logger.error(f"Error getting file metadata for {fileId}: {str(e)}")
- return None
-
- async def saveFile(self, filename: str, content: bytes, mimeType: str) -> Optional[int]:
- """
- Save a new file.
-
- Args:
- filename: Name of the file
- content: File content as bytes
- mimeType: MIME type of the file
-
- Returns:
- File ID if successful, None otherwise
- """
- try:
- return await self.service.base.saveFile(filename, content, mimeType)
- except Exception as e:
- logger.error(f"Error saving file {filename}: {str(e)}")
- return None
-
- async def deleteFile(self, fileId: str) -> bool:
- """Deletes a file."""
- try:
- return self.service.functions.deleteFile(fileId)
- except Exception as e:
- logger.error(f"Error deleting file {fileId}: {str(e)}")
- return False
-
- async def convertFileRefToId(self, ref: str) -> Optional[int]:
- """
- Convert agent file reference to file ID.
-
- Args:
- ref: File reference in format 'filename;id' or just 'id'
-
- Returns:
- File ID if successful, None otherwise
- """
- try:
- # Extract file ID from reference format
- if isinstance(ref, str) and ';' in ref:
- return int(ref.split(';')[1])
- return int(ref)
- except Exception as e:
- logger.error(f"Error converting file reference to ID: {str(e)}")
- return None
-
- async def convertFileIdToRef(self, fileId: str) -> Optional[str]:
- """
- Convert file ID to agent file reference.
-
- Args:
- fileId: File ID to convert
-
- Returns:
- File reference in format 'filename;id' if successful, None otherwise
- """
- try:
- file = await self.getFileMetadata(fileId)
- if not file:
- return None
- return f"{file['name']};{fileId}"
- except Exception as e:
- logger.error(f"Error converting file ID to reference: {str(e)}")
- return None
-
- async def convertDataFormat(self, data: Any, format: str) -> Any:
- """
- Convert data between different formats.
-
- Args:
- data: Data to convert
- format: Target format ('json', 'base64', etc.)
-
- Returns:
- Converted data
- """
- try:
- if format == 'json':
- if isinstance(data, str):
- return json.loads(data)
- return json.dumps(data)
- elif format == 'base64':
- if isinstance(data, str):
- return base64.b64encode(data.encode('utf-8')).decode('utf-8')
- return base64.b64encode(data).decode('utf-8')
- return data
- except Exception as e:
- logger.error(f"Error converting data format: {str(e)}")
- return data
-
- async def createAgentInputFileList(self, files: List[str]) -> List[Dict[str, Any]]:
- """
- Create a list of input files for agent processing.
-
- Args:
- files: List of file references
-
- Returns:
- List of file objects with content
- """
- try:
- inputFiles = []
- for file in files:
- fileId = await self.convertFileRefToId(file)
- if fileId:
- fileData = await self.getFileMetadata(fileId)
- if fileData:
- content = await self.getFileContent(fileId)
- inputFiles.append({
- 'id': fileId,
- 'name': fileData['name'],
- 'mimeType': fileData['mimeType'],
- 'content': content
- })
- return inputFiles
- except Exception as e:
- logger.error(f"Error creating agent input file list: {str(e)}")
- return []
-
- async def saveAgentOutputFiles(self, files: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
- """
- Save output files from agent processing.
-
- Args:
- files: List of file objects with content
-
- Returns:
- List of saved file metadata
- """
- try:
- savedFiles = []
- for file in files:
- # Create file metadata
- fileMeta = await self.saveFile(
- filename=file['name'],
- content=file['content'],
- mimeType=file.get('mimeType', 'application/octet-stream')
- )
-
- if fileMeta:
- savedFiles.append({
- 'id': fileMeta,
- 'name': file['name'],
- 'mimeType': file.get('mimeType', 'application/octet-stream')
- })
- return savedFiles
- except Exception as e:
- logger.error(f"Error saving agent output files: {str(e)}")
- return []
-
- async def contentWithPrompt(self, document: Dict[str, Any], prompt: str) -> Optional[Dict[str, Any]]:
- """
- Extract content from a document using AI with a specific prompt.
- Handles large files by processing in chunks and merging results.
-
- Args:
- document: Document object with file information
- prompt: Specific prompt for content extraction
-
- Returns:
- Dictionary with extracted content and metadata
- """
- try:
- # First get the document content
- chat_doc = await self.extractContent(document.get('id'))
- if not chat_doc:
- return None
-
- # Prepare the content for AI processing
- content = chat_doc.content
- mime_type = chat_doc.mimeType
-
- # For large files, process in chunks
- if len(content) > 100000: # Arbitrary threshold, adjust as needed
- chunks = self._splitContentIntoChunks(content, mime_type)
- extracted_chunks = []
-
- for chunk in chunks:
- # Process each chunk with AI
- chunk_result = await self._processContentChunk(chunk, prompt)
- if chunk_result:
- extracted_chunks.append(chunk_result)
-
- # Merge results
- return {
- "content": self._mergeChunkResults(extracted_chunks),
- "metadata": {
- "original_size": len(content),
- "chunks_processed": len(chunks),
- "mime_type": mime_type
- }
- }
- else:
- # Process single chunk
- result = await self._processContentChunk(content, prompt)
- return {
- "content": result,
- "metadata": {
- "original_size": len(content),
- "chunks_processed": 1,
- "mime_type": mime_type
- }
- }
-
- except Exception as e:
- logger.error(f"Error in contentWithPrompt: {str(e)}")
- return None
-
- def _splitContentIntoChunks(self, content: str, mime_type: str) -> List[str]:
- """
- Split content into manageable chunks based on mime type.
-
- Args:
- content: Content to split
- mime_type: MIME type of the content
-
- Returns:
- List of content chunks
- """
- try:
- if mime_type.startswith('text/'):
- # Split text content by paragraphs or sections
- return [chunk.strip() for chunk in content.split('\n\n') if chunk.strip()]
- elif mime_type == 'application/json':
- # Split JSON content by objects
- data = json.loads(content)
- if isinstance(data, list):
- return [json.dumps(item) for item in data]
- return [content]
- else:
- # Default chunking
- return [content[i:i+10000] for i in range(0, len(content), 10000)]
- except Exception as e:
- logger.error(f"Error splitting content: {str(e)}")
- return [content]
-
- async def _processContentChunk(self, chunk: str, prompt: str) -> Optional[str]:
- """
- Process a single content chunk with AI.
-
- Args:
- chunk: Content chunk to process
- prompt: Extraction prompt
-
- Returns:
- Processed content
- """
- try:
- # Create AI prompt
- ai_prompt = f"""
- Extract relevant information from this content based on the following prompt:
-
- PROMPT: {prompt}
-
- CONTENT:
- {chunk}
-
- Return ONLY the extracted information in a clear, concise format.
- """
-
- # Get AI response
- response = await self.service.base.callAi([
- {"role": "system", "content": "You are an expert at extracting relevant information from documents."},
- {"role": "user", "content": ai_prompt}
- ])
-
- return response.strip()
-
- except Exception as e:
- logger.error(f"Error processing content chunk: {str(e)}")
- return None
-
- def _mergeChunkResults(self, chunks: List[str]) -> str:
- """
- Merge processed content chunks into a single result.
-
- Args:
- chunks: List of processed chunks
-
- Returns:
- Merged content
- """
- try:
- # Remove duplicates and empty chunks
- chunks = [chunk for chunk in chunks if chunk and chunk.strip()]
-
- # Merge chunks with appropriate spacing
- return "\n\n".join(chunks)
-
- except Exception as e:
- logger.error(f"Error merging chunk results: {str(e)}")
- return ""
-
-# Singleton factory for the document manager
-def getDocumentManager():
- return DocumentManager.getInstance()
\ No newline at end of file
diff --git a/modules/workflow/documentProcessor.py b/modules/workflow/documentProcessor.py
deleted file mode 100644
index bd099128..00000000
--- a/modules/workflow/documentProcessor.py
+++ /dev/null
@@ -1,1008 +0,0 @@
-"""
-Module for extracting content from various file formats.
-Provides specialized functions for processing text, PDF, Office documents, images, etc.
-"""
-
-import logging
-import os
-import io
-from typing import Dict, Any, List, Optional, Union, Tuple
-import base64
-from modules.interfaces.serviceChatModel import ChatContent
-
-# Configure logger
-logger = logging.getLogger(__name__)
-
-# Optional imports - only loaded when needed
-pdfExtractorLoaded = False
-officeExtractorLoaded = False
-imageProcessorLoaded = False
-
-class FileProcessingError(Exception):
- """Custom exception for file processing errors."""
- pass
-
-def getDocumentContents(fileMetadata: Dict[str, Any], fileContent: bytes) -> List[ChatContent]:
- """
- Main function for extracting content from a file based on its MIME type.
- Delegates to specialized extraction functions.
-
- Args:
- fileMetadata: File metadata (Name, MIME type, etc.)
- fileContent: Binary data of the file
-
- Returns:
- List of ChatContent objects with metadata and base64Encoded flag
- """
- try:
- mimeType = fileMetadata.get("mimeType", "application/octet-stream")
- fileName = fileMetadata.get("name", "unknown")
-
- logger.info(f"Extracting content from file '{fileName}' (MIME type: {mimeType})")
-
- # Extract content based on MIME type
- contents = []
-
- # Try to detect actual file type from content for unknown MIME types
- if mimeType == "application/octet-stream":
- # Check file extension first
- ext = os.path.splitext(fileName)[1].lower()
- if ext:
- # Map common extensions to MIME types
- ext_to_mime = {
- '.txt': 'text/plain',
- '.md': 'text/markdown',
- '.csv': 'text/csv',
- '.json': 'application/json',
- '.xml': 'application/xml',
- '.js': 'application/javascript',
- '.py': 'application/x-python',
- '.svg': 'image/svg+xml',
- '.jpg': 'image/jpeg',
- '.jpeg': 'image/jpeg',
- '.png': 'image/png',
- '.gif': 'image/gif',
- '.pdf': 'application/pdf',
- '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
- '.doc': 'application/msword',
- '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- '.xls': 'application/vnd.ms-excel',
- '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
- '.ppt': 'application/vnd.ms-powerpoint'
- }
- if ext in ext_to_mime:
- mimeType = ext_to_mime[ext]
- logger.info(f"Detected MIME type {mimeType} from extension {ext}")
- else:
- logger.warning(f"Unknown file extension {ext} for file {fileName}")
-
- # Try to detect if it's text content
- try:
- text_content = fileContent.decode('utf-8')
- logger.info(f"Successfully decoded file {fileName} as text")
- contents.extend(extractTextContent(fileName, fileContent, "text/plain"))
- except UnicodeDecodeError:
- logger.info(f"File {fileName} is not text, treating as binary")
- contents.extend(extractBinaryContent(fileName, fileContent, mimeType))
-
- # Text-based formats (excluding CSV which has its own handler)
- elif mimeType == "text/csv":
- contents.extend(extractCsvContent(fileName, fileContent))
-
- # Then handle other text-based formats
- elif mimeType.startswith("text/") or mimeType in [
- "application/json",
- "application/xml",
- "application/javascript",
- "application/x-python"
- ]:
- contents.extend(extractTextContent(fileName, fileContent, mimeType))
-
- # SVG Files
- elif mimeType == "image/svg+xml":
- contents.extend(extractSvgContent(fileName, fileContent))
-
- # Images
- elif mimeType.startswith("image/"):
- contents.extend(extractImageContent(fileName, fileContent, mimeType))
-
- # PDF Documents
- elif mimeType == "application/pdf":
- contents.extend(extractPdfContent(fileName, fileContent))
-
- # Word Documents
- elif mimeType in [
- "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
- "application/msword"
- ]:
- contents.extend(extractWordContent(fileName, fileContent, mimeType))
-
- # Excel Documents
- elif mimeType in [
- "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
- "application/vnd.ms-excel"
- ]:
- contents.extend(extractExcelContent(fileName, fileContent, mimeType))
-
- # PowerPoint Documents
- elif mimeType in [
- "application/vnd.openxmlformats-officedocument.presentationml.presentation",
- "application/vnd.ms-powerpoint"
- ]:
- contents.extend(extractPowerpointContent(fileName, fileContent, mimeType))
-
- # Binary data as fallback for unknown formats
- else:
- logger.warning(f"Unknown MIME type {mimeType} for file {fileName}, treating as binary")
- contents.extend(extractBinaryContent(fileName, fileContent, mimeType))
-
- # Fallback when no content could be extracted
- if not contents:
- logger.warning(f"No content extracted from file '{fileName}', using binary fallback")
-
- # Convert binary content to base64
- encoded_data = base64.b64encode(fileContent).decode('utf-8')
-
- contents.append(ChatContent(
- sequenceNr=1,
- name='1_undefined',
- mimeType=mimeType,
- data=encoded_data,
- metadata={
- "isText": False,
- "base64Encoded": True
- }
- ))
-
- # Add generic attributes for all documents
- for content in contents:
- # Make sure all content items have the base64Encoded flag
- if not hasattr(content, "base64Encoded"):
- if isinstance(content.data, bytes):
- # Convert bytes to base64
- content.data = base64.b64encode(content.data).decode('utf-8')
- content.base64Encoded = True
- else:
- # Assume text content if not explicitly marked
- content.base64Encoded = False
-
- # Maintain backward compatibility with old "base64Encoded" flag in metadata
- if not content.metadata:
- content.metadata = {}
-
- # Set base64Encoded in metadata for backward compatibility
- content.metadata["base64Encoded"] = content.base64Encoded
-
- logger.info(f"Successfully extracted {len(contents)} content items from file '{fileName}'")
- return contents
-
- except Exception as e:
- logger.error(f"Error during content extraction for file {fileMetadata.get('name', 'unknown')}: {str(e)}", exc_info=True)
- # Fallback on error - return original data
- return [ChatContent(
- sequenceNr=1,
- name=fileMetadata.get("name", "unknown"),
- mimeType=fileMetadata.get("mimeType", "application/octet-stream"),
- data=base64.b64encode(fileContent).decode('utf-8'),
- metadata={
- "isText": False,
- "base64Encoded": True
- }
- )]
-
-
-def _loadPdfExtractor():
- """Loads PDF extraction libraries when needed"""
- global pdfExtractorLoaded
- if not pdfExtractorLoaded:
- try:
- global PyPDF2, fitz
- import PyPDF2
- import fitz # PyMuPDF for more extensive PDF processing
- pdfExtractorLoaded = True
- logger.info("PDF extraction libraries successfully loaded")
- except ImportError as e:
- logger.warning(f"PDF extraction libraries could not be loaded: {e}")
-
-def _loadOfficeExtractor():
- """Loads Office document extraction libraries when needed"""
- global officeExtractorLoaded
- if not officeExtractorLoaded:
- try:
- global docx, openpyxl
- import docx # python-docx for Word documents
- import openpyxl # for Excel files
- officeExtractorLoaded = True
- logger.info("Office extraction libraries successfully loaded")
- except ImportError as e:
- logger.warning(f"Office extraction libraries could not be loaded: {e}")
-
-def _loadImageProcessor():
- """Loads image processing libraries when needed"""
- global imageProcessorLoaded
- if not imageProcessorLoaded:
- try:
- global PIL, Image
- from PIL import Image
- imageProcessorLoaded = True
- logger.info("Image processing libraries successfully loaded")
- except ImportError as e:
- logger.warning(f"Image processing libraries could not be loaded: {e}")
-
-def extractTextContent(fileName: str, fileContent: bytes, mimeType: str) -> List[Dict[str, Any]]:
- """
- Extracts text from text files.
-
- Args:
- fileName: Name of the file
- fileContent: Binary data of the file
- mimeType: MIME type of the file
-
- Returns:
- List of Text-Content objects with base64Encoded = False
- """
- try:
- # Keep original file extension
- fileExtension = os.path.splitext(fileName)[1][1:] if os.path.splitext(fileName)[1] else "txt"
-
- # Extract text content
- textContent = fileContent.decode('utf-8')
- return [{
- "sequenceNr": 1,
- "name": "1_text", # Simplified naming
- "ext": fileExtension,
- "mimeType": "text/plain",
- "data": textContent,
- "base64Encoded": False,
- "metadata": {
- "isText": True
- }
- }]
- except UnicodeDecodeError:
- logger.warning(f"Could not decode text from file '{fileName}' as UTF-8, trying alternative encodings")
- try:
- # Try alternative encodings
- for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
- try:
- textContent = fileContent.decode(encoding)
- logger.info(f"Text successfully decoded with encoding {encoding}")
- return [{
- "sequenceNr": 1,
- "name": "1_text", # Simplified naming
- "ext": fileExtension,
- "mimeType": "text/plain",
- "data": textContent,
- "base64Encoded": False,
- "metadata": {
- "isText": True,
- "encoding": encoding
- }
- }]
- except UnicodeDecodeError:
- continue
-
- # Fallback to binary data if no encoding works
- logger.warning(f"Could not decode text, using binary data")
- return [{
- "sequenceNr": 1,
- "name": "1_binary", # Simplified naming
- "ext": fileExtension,
- "mimeType": mimeType,
- "data": base64.b64encode(fileContent).decode('utf-8'),
- "base64Encoded": True,
- "metadata": {
- "isText": False
- }
- }]
- except Exception as e:
- logger.error(f"Error in alternative text decoding: {str(e)}")
- # Return binary data as fallback
- return [{
- "sequenceNr": 1,
- "name": "1_binary", # Simplified naming
- "ext": fileExtension,
- "mimeType": mimeType,
- "data": base64.b64encode(fileContent).decode('utf-8'),
- "base64Encoded": True,
- "metadata": {
- "isText": False
- }
- }]
-
-def extractCsvContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]:
- """
- Extracts content from CSV files.
-
- Args:
- fileName: Name of the file
- fileContent: Binary data of the file
-
- Returns:
- List of CSV-Content objects with base64Encoded = False
- """
- try:
- # Extract text content
- csvContent = fileContent.decode('utf-8')
- return [{
- "sequenceNr": 1,
- "name": "1_csv", # Simplified naming
- "ext": "csv",
- "mimeType": "text/csv",
- "data": csvContent,
- "base64Encoded": False,
- "metadata": {
- "isText": True,
- "format": "csv"
- }
- }]
- except UnicodeDecodeError:
- logger.warning(f"Could not decode CSV from file '{fileName}' as UTF-8, trying alternative encodings")
- try:
- # Try alternative encodings for CSV
- for encoding in ['latin-1', 'cp1252', 'iso-8859-1']:
- try:
- csvContent = fileContent.decode(encoding)
- logger.info(f"CSV successfully decoded with encoding {encoding}")
- return [{
- "sequenceNr": 1,
- "name": "1_csv", # Simplified naming
- "ext": "csv",
- "mimeType": "text/csv",
- "data": csvContent,
- "base64Encoded": False,
- "metadata": {
- "isText": True,
- "encoding": encoding,
- "format": "csv"
- }
- }]
- except UnicodeDecodeError:
- continue
-
- # Fallback to binary data
- return [{
- "sequenceNr": 1,
- "name": "1_binary", # Simplified naming
- "ext": "csv",
- "mimeType": "text/csv",
- "data": base64.b64encode(fileContent).decode('utf-8'),
- "base64Encoded": True,
- "metadata": {
- "isText": False
- }
- }]
- except Exception as e:
- logger.error(f"Error in alternative CSV decoding: {str(e)}")
- return [{
- "sequenceNr": 1,
- "name": "1_binary", # Simplified naming
- "ext": "csv",
- "mimeType": "text/csv",
- "data": base64.b64encode(fileContent).decode('utf-8'),
- "base64Encoded": True,
- "metadata": {
- "isText": False
- }
- }]
-
-def extractSvgContent(fileName: str, fileContent: bytes) -> List[Dict[str, Any]]:
- """
- Extracts content from SVG files.
-
- Args:
- fileName: Name of the file
- fileContent: Binary data of the file
-
- Returns:
- List of SVG-Content objects with dual text/image metadata
- """
- contents = []
-
- try:
- # Extract SVG as text content (XML)
- svgText = fileContent.decode('utf-8')
-
- # Check if it's actually SVG by looking for the SVG tag
- if "