diff --git a/modules/agents/agentAnalyst.py b/modules/agents/agentAnalyst.py index c2d392f4..a8bc1637 100644 --- a/modules/agents/agentAnalyst.py +++ b/modules/agents/agentAnalyst.py @@ -13,7 +13,7 @@ from typing import Dict, Any, List, Optional import pandas as pd import matplotlib.pyplot as plt import seaborn as sns -from datetime import datetime +from datetime import datetime, UTC import hashlib import uuid import re @@ -26,7 +26,13 @@ import inspect from pydantic import BaseModel from modules.workflow.agentBase import AgentBase -from modules.interfaces.serviceChatModel import ChatContent +from modules.interfaces.serviceChatModel import ( + ChatContent, + ChatMessage, + ChatStat, + AgentResponse, + AgentHandover +) logger = logging.getLogger(__name__) @@ -54,74 +60,241 @@ class AgentAnalyst(AgentBase): """Set external dependencies for the agent.""" self.setService(serviceBase) - async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: + async def processTask(self, handover: AgentHandover) -> AgentResponse: """ Process a task by focusing on required outputs and using AI to guide the analysis process. Args: - task: Task dictionary with prompt, inputDocuments, outputSpecifications + handover: AgentHandover object containing task information Returns: - Dictionary with feedback and documents + AgentResponse object with execution results """ try: - # Extract task information - prompt = task.get("prompt", "") - outputSpecs = task.get("outputSpecifications", []) - workflow = task.get("context", {}).get("workflow", {}) + # 1. Initial Analysis & Planning + self.service.logAdd(handover.workflowId, "Starting analysis task...", level="info", progress=10) - # Check AI service - if not self.service or not self.service.base: - return { - "feedback": "The Analyst agent requires an AI service to function effectively.", - "documents": [] - } - - # Create analysis plan - if workflow: - self.service.logAdd(workflow, "Extracting data from documents...", level="info", progress=35) - analysisPlan = await self._createAnalysisPlan(prompt) - - # Check if this is truly an analysis task - if not analysisPlan.get("requiresAnalysis", True): - return { - "feedback": "This task doesn't appear to require analysis. Please try a different agent.", - "documents": [] - } - - # Analyze data - if workflow: - self.service.logAdd(workflow, "Analyzing task requirements...", level="info", progress=45) - analysisResults = await self._analyzeData(task, analysisPlan) - - # Format results into requested output documents - totalSpecs = len(outputSpecs) - for i, spec in enumerate(outputSpecs): - progress = 50 + int((i / totalSpecs) * 40) # Progress from 50% to 90% - self.service.logAdd(workflow, f"Creating output {i+1}/{totalSpecs}...", level="info", progress=progress) - - documents = await self._createOutputDocuments( - prompt, - analysisResults, - outputSpecs, - analysisPlan + # Generate extraction prompts for each file + extraction_prompts = await self._generateExtractionPrompts( + prompt=handover.promptUserInitial, + documents=handover.documentsUserInitial ) - # Generate feedback - feedback = analysisPlan.get("feedback", f"I analyzed '{prompt[:50]}...' and generated {len(documents)} output documents.") + # 2. Parallel Content Extraction with specific prompts + self.service.logAdd(handover.workflowId, "Extracting content from documents...", level="info", progress=20) - return { - "feedback": feedback, - "documents": documents - } + extracted_contents = [] + for doc, extraction_prompt in zip(handover.documentsUserInitial, extraction_prompts): + # Use document service for extraction with specific prompt + content_result = await self.service.document.contentWithPrompt(doc, extraction_prompt) + if content_result: + extracted_contents.append({ + "document": doc, + "content": content_result["content"], + "metadata": content_result["metadata"], + "extraction_prompt": extraction_prompt + }) + + # 3. Analysis & Reflection + self.service.logAdd(handover.workflowId, "Analyzing extracted content...", level="info", progress=50) + + analysis_results = await self._analyzeContent( + prompt=handover.promptUserInitial, + extracted_contents=extracted_contents + ) + + # 4. Response Generation & Handover Update + self.service.logAdd(handover.workflowId, "Generating response...", level="info", progress=80) + + # Create ChatMessage with results + response_message = ChatMessage( + id=str(uuid.uuid4()), + workflowId=handover.workflowId, + agentName=self.name, + message=analysis_results.get("feedback", ""), + role="assistant", + status="completed", + sequenceNr=handover.sequenceNr, + startedAt=handover.startedAt, + finishedAt=datetime.now(UTC).isoformat(), + success=True, + documents=analysis_results.get("documents", []), + stats=ChatStat( + processingTime=analysis_results.get("processing_time"), + tokenCount=analysis_results.get("token_count"), + successRate=1.0 + ) + ) + + # Update handover object + handover.status = "success" + handover.progress = 100.0 + handover.finishedAt = datetime.now(UTC).isoformat() + handover.documentsOutput = analysis_results.get("documents", []) + handover.promptFromFinishedAgent = analysis_results.get("feedback", "") + + return AgentResponse( + success=True, + message=response_message, + performance=analysis_results.get("performance", {}), + progress=100.0 + ) except Exception as e: - logger.error(f"Error during analysis: {str(e)}", exc_info=True) + logger.error(f"Error in analysis task: {str(e)}", exc_info=True) + + # Create error response + error_message = ChatMessage( + id=str(uuid.uuid4()), + workflowId=handover.workflowId, + agentName=self.name, + message=f"Error during analysis: {str(e)}", + role="system", + status="error", + sequenceNr=handover.sequenceNr, + startedAt=handover.startedAt, + finishedAt=datetime.now(UTC).isoformat(), + success=False + ) + + # Update handover with error + handover.status = "failed" + handover.error = str(e) + handover.finishedAt = datetime.now(UTC).isoformat() + + return AgentResponse( + success=False, + message=error_message, + performance={}, + progress=0.0 + ) + + async def _generateExtractionPrompts(self, prompt: str, documents: List[Dict[str, Any]]) -> List[str]: + """ + Generate specific extraction prompts for each document. + + Args: + prompt: The original user prompt + documents: List of documents to process + + Returns: + List of extraction prompts, one for each document + """ + try: + # Create prompt for AI to generate extraction prompts + prompt_generation = f""" + Generate specific extraction prompts for each document based on the user's request. + + USER REQUEST: {prompt} + + DOCUMENTS: + {json.dumps([{ + "name": doc.get("name", ""), + "type": doc.get("type", ""), + "size": doc.get("size", 0) + } for doc in documents], indent=2)} + + For each document, generate a specific extraction prompt that will help extract the most relevant information. + Consider: + 1. The document type and format + 2. The user's original request + 3. What specific information would be most useful + + Return a JSON array of prompts, one for each document: + [ + {{ + "document_name": "name of the document", + "extraction_prompt": "specific prompt for this document" + }} + ] + """ + + # Get AI's response + response = await self.service.base.callAi([ + {"role": "system", "content": "You are an expert at creating precise document extraction prompts."}, + {"role": "user", "content": prompt_generation} + ]) + + # Parse response + prompts_data = json.loads(response) + + # Map prompts to documents + extraction_prompts = [] + for doc in documents: + doc_prompt = next( + (p["extraction_prompt"] for p in prompts_data if p["document_name"] == doc.get("name")), + f"Extract all relevant information from {doc.get('name')} that relates to: {prompt}" + ) + extraction_prompts.append(doc_prompt) + + return extraction_prompts + + except Exception as e: + logger.error(f"Error generating extraction prompts: {str(e)}") + # Fallback to generic prompts + return [f"Extract all relevant information from {doc.get('name')} that relates to: {prompt}" + for doc in documents] + + async def _analyzeContent(self, prompt: str, extracted_contents: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze the extracted content and generate results. + + Args: + prompt: The original user prompt + extracted_contents: List of extracted content with metadata + + Returns: + Dictionary containing analysis results + """ + try: + # Create analysis prompt + analysis_prompt = f""" + Analyze the following extracted content and provide insights based on the user's request. + + USER REQUEST: {prompt} + + EXTRACTED CONTENT: + {json.dumps([{ + "document": content["document"].get("name", ""), + "content": content["content"], + "extraction_prompt": content["extraction_prompt"] + } for content in extracted_contents], indent=2)} + + Provide a comprehensive analysis that: + 1. Synthesizes information from all documents + 2. Identifies key insights and patterns + 3. Relates findings to the user's request + 4. Suggests potential visualizations or additional analysis + + Format your response as a JSON object with: + {{ + "insights": ["list of key insights"], + "patterns": ["list of identified patterns"], + "recommendations": ["list of recommendations"], + "visualizations": ["list of suggested visualizations"], + "feedback": "summary of findings" + }} + """ + + # Get AI's analysis + response = await self.service.base.callAi([ + {"role": "system", "content": "You are an expert data analyst."}, + {"role": "user", "content": analysis_prompt} + ]) + + # Parse and return results + return json.loads(response) + + except Exception as e: + logger.error(f"Error analyzing content: {str(e)}") return { - "feedback": f"Error during analysis: {str(e)}", - "documents": [] + "insights": [], + "patterns": [], + "recommendations": [], + "visualizations": [], + "feedback": f"Error during analysis: {str(e)}" } - + def _extractData(self, documents: List[Dict[str, Any]]) -> tuple: """ Extract data from documents, focusing on dataExtracted fields. diff --git a/modules/agents/z_agentAnalyst copy.py b/modules/agents/z_agentAnalyst copy.py new file mode 100644 index 00000000..c2d392f4 --- /dev/null +++ b/modules/agents/z_agentAnalyst copy.py @@ -0,0 +1,902 @@ +""" +Data analyst agent for analysis and interpretation of data. +Focuses on output-first design with AI-powered analysis. +""" + +import logging +import json +import io +import base64 +import os +import time +from typing import Dict, Any, List, Optional +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns +from datetime import datetime +import hashlib +import uuid +import re +import shutil +from pathlib import Path +import traceback +import sys +import importlib.util +import inspect +from pydantic import BaseModel + +from modules.workflow.agentBase import AgentBase +from modules.interfaces.serviceChatModel import ChatContent + +logger = logging.getLogger(__name__) + +class AgentAnalyst(AgentBase): + """AI-driven agent for data analysis and visualization""" + + def __init__(self): + """Initialize the data analysis agent""" + super().__init__() + self.name = "analyst" + self.label = "Data Analysis" + self.description = "Analyzes data using AI-powered insights and visualizations, produce diagrams and visualizations" + self.capabilities = [ + "dataAnalysis", + "statistics", + "visualization", + "dataInterpretation", + "reportGeneration" + ] + + # Set default visualization settings + plt.style.use('seaborn-v0_8-whitegrid') + + def setDependencies(self, serviceBase=None): + """Set external dependencies for the agent.""" + self.setService(serviceBase) + + async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: + """ + Process a task by focusing on required outputs and using AI to guide the analysis process. + + Args: + task: Task dictionary with prompt, inputDocuments, outputSpecifications + + Returns: + Dictionary with feedback and documents + """ + try: + # Extract task information + prompt = task.get("prompt", "") + outputSpecs = task.get("outputSpecifications", []) + workflow = task.get("context", {}).get("workflow", {}) + + # Check AI service + if not self.service or not self.service.base: + return { + "feedback": "The Analyst agent requires an AI service to function effectively.", + "documents": [] + } + + # Create analysis plan + if workflow: + self.service.logAdd(workflow, "Extracting data from documents...", level="info", progress=35) + analysisPlan = await self._createAnalysisPlan(prompt) + + # Check if this is truly an analysis task + if not analysisPlan.get("requiresAnalysis", True): + return { + "feedback": "This task doesn't appear to require analysis. Please try a different agent.", + "documents": [] + } + + # Analyze data + if workflow: + self.service.logAdd(workflow, "Analyzing task requirements...", level="info", progress=45) + analysisResults = await self._analyzeData(task, analysisPlan) + + # Format results into requested output documents + totalSpecs = len(outputSpecs) + for i, spec in enumerate(outputSpecs): + progress = 50 + int((i / totalSpecs) * 40) # Progress from 50% to 90% + self.service.logAdd(workflow, f"Creating output {i+1}/{totalSpecs}...", level="info", progress=progress) + + documents = await self._createOutputDocuments( + prompt, + analysisResults, + outputSpecs, + analysisPlan + ) + + # Generate feedback + feedback = analysisPlan.get("feedback", f"I analyzed '{prompt[:50]}...' and generated {len(documents)} output documents.") + + return { + "feedback": feedback, + "documents": documents + } + + except Exception as e: + logger.error(f"Error during analysis: {str(e)}", exc_info=True) + return { + "feedback": f"Error during analysis: {str(e)}", + "documents": [] + } + + def _extractData(self, documents: List[Dict[str, Any]]) -> tuple: + """ + Extract data from documents, focusing on dataExtracted fields. + + Args: + documents: List of input documents + + Returns: + Tuple of (datasets dictionary, document context text) + """ + datasets = {} + documentContext = "" + + # Process each document + for doc in documents: + docName = doc.get("name", "unnamed") + if doc.get("ext"): + docName = f"{docName}.{doc.get('ext')}" + + documentContext += f"\n\n--- {docName} ---\n" + + # Process contents + for content in doc.get("contents", []): + # Focus only on dataExtracted + if content.get("dataExtracted"): + extractedText = content.get("dataExtracted", "") + documentContext += extractedText + + # Try to parse as structured data if appropriate + if docName.lower().endswith(('.csv', '.tsv')): + try: + df = pd.read_csv(io.StringIO(extractedText)) + datasets[docName] = df + except: + pass + elif docName.lower().endswith('.json'): + try: + jsonData = json.loads(extractedText) + if isinstance(jsonData, list): + df = pd.DataFrame(jsonData) + datasets[docName] = df + elif isinstance(jsonData, dict): + # Handle nested JSON structures + if any(isinstance(v, list) for v in jsonData.values()): + for key, value in jsonData.items(): + if isinstance(value, list) and len(value) > 0: + df = pd.DataFrame(value) + datasets[f"{docName}:{key}"] = df + else: + df = pd.DataFrame([jsonData]) + datasets[docName] = df + except: + pass + + # Try to detect tabular data in text content + if docName not in datasets and len(extractedText.splitlines()) > 2: + lines = extractedText.splitlines() + if any(',' in line for line in lines[:5]): + try: + df = pd.read_csv(io.StringIO(extractedText)) + if len(df.columns) > 1: + datasets[docName] = df + except: + pass + elif any('\t' in line for line in lines[:5]): + try: + df = pd.read_csv(io.StringIO(extractedText), sep='\t') + if len(df.columns) > 1: + datasets[docName] = df + except: + pass + + return datasets, documentContext + + async def _analyzeTask(self, prompt: str, documentContext: str, datasets: Dict[str, Any], outputSpecs: List[Dict[str, Any]]) -> Dict[str, Any]: + """ + Analyze the task requirements using AI. + + Args: + prompt: The task prompt + documentContext: Context from input documents + datasets: Available datasets + outputSpecs: Output specifications + + Returns: + Analysis plan dictionary + """ + # Create analysis prompt + analysisPrompt = f""" + Analyze this data analysis task and create a detailed plan: + + TASK: {prompt} + + DOCUMENT CONTEXT: + {documentContext} + + AVAILABLE DATASETS: + {json.dumps(datasets, indent=2)} + + REQUIRED OUTPUTS: + {json.dumps(outputSpecs, indent=2)} + + Create a detailed analysis plan in JSON format with: + {{ + "analysisSteps": [ + {{ + "step": "step description", + "purpose": "why this step is needed", + "datasets": ["dataset1", "dataset2"], + "techniques": ["technique1", "technique2"], + "outputs": ["output1", "output2"] + }} + ], + "visualizations": [ + {{ + "type": "visualization type", + "purpose": "what it shows", + "datasets": ["dataset1"], + "settings": {{"key": "value"}} + }} + ], + "insights": [ + {{ + "type": "insight type", + "description": "what to look for", + "datasets": ["dataset1"] + }} + ], + "feedback": "explanation of the analysis approach" + }} + + Respond with ONLY the JSON object, no additional text or explanations. + """ + + try: + # Get analysis plan from AI + response = await self.service.base.callAi([ + {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."}, + {"role": "user", "content": analysisPrompt} + ], produceUserAnswer=True) + + # Extract JSON + jsonStart = response.find('{') + jsonEnd = response.rfind('}') + 1 + + if jsonStart >= 0 and jsonEnd > jsonStart: + plan = json.loads(response[jsonStart:jsonEnd]) + return plan + else: + # Fallback plan + logger.warning(f"Not able creating analysis plan, generating fallback plan") + return { + "analysisSteps": [ + { + "step": "Basic data analysis", + "purpose": "Understand the data structure and content", + "datasets": list(datasets.keys()), + "techniques": ["summary statistics", "data visualization"], + "outputs": ["summary report", "basic visualizations"] + } + ], + "visualizations": [ + { + "type": "basic charts", + "purpose": "Show data distribution and relationships", + "datasets": list(datasets.keys()), + "settings": {} + } + ], + "insights": [ + { + "type": "basic insights", + "description": "Key findings from the data", + "datasets": list(datasets.keys()) + } + ], + "feedback": f"I'll analyze the data and provide insights about {prompt}" + } + + except Exception as e: + logger.warning(f"Error creating analysis plan: {str(e)}") + # Simple fallback plan + return { + "analysisSteps": [ + { + "step": "Basic data analysis", + "purpose": "Understand the data structure and content", + "datasets": list(datasets.keys()), + "techniques": ["summary statistics", "data visualization"], + "outputs": ["summary report", "basic visualizations"] + } + ], + "visualizations": [ + { + "type": "basic charts", + "purpose": "Show data distribution and relationships", + "datasets": list(datasets.keys()), + "settings": {} + } + ], + "insights": [ + { + "type": "basic insights", + "description": "Key findings from the data", + "datasets": list(datasets.keys()) + } + ], + "feedback": f"I'll analyze the data and provide insights about {prompt}" + } + + async def _createAnalysisPlan(self, prompt: str) -> Dict[str, Any]: + """ + Create an analysis plan based on the task prompt. + + Args: + prompt: The task prompt + + Returns: + Analysis plan dictionary + """ + try: + # Create analysis prompt + analysisPrompt = f""" + Analyze this data analysis task and create a detailed plan: + + TASK: {prompt} + + Create a detailed analysis plan in JSON format with: + {{ + "requiresAnalysis": true/false, + "analysisSteps": [ + {{ + "step": "step description", + "purpose": "why this step is needed", + "techniques": ["technique1", "technique2"], + "outputs": ["output1", "output2"] + }} + ], + "visualizations": [ + {{ + "type": "visualization type", + "purpose": "what it shows", + "settings": {{"key": "value"}} + }} + ], + "insights": [ + {{ + "type": "insight type", + "description": "what to look for" + }} + ], + "feedback": "explanation of the analysis approach" + }} + + Respond with ONLY the JSON object, no additional text or explanations. + """ + + # Get analysis plan from AI + response = await self.service.base.callAi([ + {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."}, + {"role": "user", "content": analysisPrompt} + ], produceUserAnswer=True) + + # Extract JSON + jsonStart = response.find('{') + jsonEnd = response.rfind('}') + 1 + + if jsonStart >= 0 and jsonEnd > jsonStart: + plan = json.loads(response[jsonStart:jsonEnd]) + return plan + else: + # Fallback plan + logger.warning(f"Not able creating analysis plan, generating fallback plan") + return { + "requiresAnalysis": True, + "analysisSteps": [ + { + "step": "Basic data analysis", + "purpose": "Understand the data structure and content", + "techniques": ["summary statistics", "data visualization"], + "outputs": ["summary report", "basic visualizations"] + } + ], + "visualizations": [ + { + "type": "basic charts", + "purpose": "Show data distribution and relationships", + "settings": {} + } + ], + "insights": [ + { + "type": "basic insights", + "description": "Key findings from the data" + } + ], + "feedback": f"I'll analyze the data and provide insights about {prompt}" + } + + except Exception as e: + logger.warning(f"Error creating analysis plan: {str(e)}") + # Simple fallback plan + return { + "requiresAnalysis": True, + "analysisSteps": [ + { + "step": "Basic data analysis", + "purpose": "Understand the data structure and content", + "techniques": ["summary statistics", "data visualization"], + "outputs": ["summary report", "basic visualizations"] + } + ], + "visualizations": [ + { + "type": "basic charts", + "purpose": "Show data distribution and relationships", + "settings": {} + } + ], + "insights": [ + { + "type": "basic insights", + "description": "Key findings from the data" + } + ], + "feedback": f"I'll analyze the data and provide insights about {prompt}" + } + + async def _createVisualization(self, datasets: Dict, prompt: str, outputLabel: str, + analysisPlan: Dict, description: str) -> Dict: + """ + Create a visualization based on the analysis plan. + + Args: + datasets: Dictionary of datasets + prompt: Original task prompt + outputLabel: Output file label + analysisPlan: Analysis plan + description: Output description + + Returns: + Document dictionary with visualization + """ + try: + # Get visualization recommendations + vizRecommendations = analysisPlan.get("visualizations", []) + + if not vizRecommendations: + # Generate visualization recommendations if none provided + self.service.base.logAdd(analysisPlan.get("workflowId"), "Generating visualization recommendations...", level="info", progress=50) + vizPrompt = f""" + Based on this data and task, recommend appropriate visualizations. + + TASK: {prompt} + DESCRIPTION: {description} + + DATASETS: + {json.dumps({name: {"shape": df.shape, "columns": df.columns.tolist()} + for name, df in datasets.items()}, indent=2)} + + Recommend visualizations in JSON format: + {{ + "visualizations": [ + {{ + "type": "chart_type", + "dataSource": "dataset_name", + "variables": ["col1", "col2"], + "purpose": "explanation" + }} + ] + }} + """ + + response = await self.service.base.callAi([ + {"role": "system", "content": "You are a data visualization expert. Recommend appropriate visualizations based on the data and task."}, + {"role": "user", "content": vizPrompt} + ]) + + # Extract JSON + jsonStart = response.find('{') + jsonEnd = response.rfind('}') + 1 + + if jsonStart >= 0 and jsonEnd > jsonStart: + vizData = json.loads(response[jsonStart:jsonEnd]) + vizRecommendations = vizData.get("visualizations", []) + + # Determine format from filename + formatType = outputLabel.split('.')[-1].lower() + if formatType not in ['png', 'jpg', 'jpeg', 'svg']: + formatType = 'png' + + # If no datasets available, create error message image + if not datasets: + plt.figure(figsize=(10, 6)) + plt.text(0.5, 0.5, "No data available for visualization", + ha='center', va='center', fontsize=14) + plt.tight_layout() + imgData = self._getImageBase64(formatType) + plt.close() + + return { + "label": outputLabel, + "content": imgData, + "metadata": { + "contentType": f"image/{formatType}" + } + } + + # Prepare dataset info for the first dataset if none specified + if not vizRecommendations and datasets: + name, df = next(iter(datasets.items())) + vizRecommendations = [{ + "type": "auto", + "dataSource": name, + "variables": df.columns.tolist()[:5], + "purpose": "general analysis" + }] + + # Create visualization code prompt + vizPrompt = f""" + Generate Python matplotlib/seaborn code to create a visualization for: + + TASK: {prompt} + + VISUALIZATION REQUIREMENTS: + - Output format: {formatType} + - Filename: {outputLabel} + - Description: {description} + + RECOMMENDED VISUALIZATION: + {json.dumps(vizRecommendations, indent=2)} + + AVAILABLE DATASETS: + """ + + # Add dataset info for recommended sources + for viz in vizRecommendations: + dataSource = viz.get("dataSource") + if dataSource in datasets: + df = datasets[dataSource] + vizPrompt += f"\nDataset '{dataSource}':\n" + vizPrompt += f"- Shape: {df.shape}\n" + vizPrompt += f"- Columns: {df.columns.tolist()}\n" + vizPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n" + + vizPrompt += """ + Generate ONLY Python code that: + 1. Uses matplotlib and/or seaborn to create a clear visualization + 2. Sets figure size to (10, 6) + 3. Includes appropriate titles, labels, and legend + 4. Uses professional color schemes + 5. Handles any missing data gracefully + + Return ONLY executable Python code, no explanations or markdown. + """ + + try: + # Get visualization code from AI + vizCode = await self.service.base.callAi([ + {"role": "system", "content": "You are a data visualization expert. Provide only executable Python code."}, + {"role": "user", "content": vizPrompt} + ], produceUserAnswer = True) + + # Clean code + vizCode = vizCode.replace("```python", "").replace("```", "").strip() + + # Execute visualization code + plt.figure(figsize=(10, 6)) + + # Make local variables available to the code + localVars = { + "plt": plt, + "sns": sns, + "pd": pd, + "np": __import__('numpy') + } + + # Add datasets to local variables + for name, df in datasets.items(): + # Create a sanitized variable name + varName = ''.join(c if c.isalnum() else '_' for c in name) + localVars[varName] = df + + # Also add with standard names for simpler code + if "df" not in localVars: + localVars["df"] = df + elif "df2" not in localVars: + localVars["df2"] = df + + # Execute the visualization code + exec(vizCode, globals(), localVars) + + # Capture the image + imgData = self._getImageBase64(formatType) + plt.close() + + return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") + + except Exception as e: + logger.error(f"Error creating visualization: {str(e)}", exc_info=True) + + # Create error message image + plt.figure(figsize=(10, 6)) + plt.text(0.5, 0.5, f"Visualization error: {str(e)}", + ha='center', va='center', fontsize=12) + plt.tight_layout() + imgData = self._getImageBase64(formatType) + plt.close() + + return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") + + except Exception as e: + logger.error(f"Error creating visualization: {str(e)}", exc_info=True) + + # Create error message image + plt.figure(figsize=(10, 6)) + plt.text(0.5, 0.5, f"Visualization error: {str(e)}", + ha='center', va='center', fontsize=12) + plt.tight_layout() + imgData = self._getImageBase64(formatType) + plt.close() + + return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") + + async def _createDataDocument(self, datasets: Dict, prompt: str, outputLabel: str, + analysisPlan: Dict, description: str) -> ChatContent: + """ + Create a data document (CSV, JSON, Excel) from analysis results. + + Args: + datasets: Dictionary of datasets + prompt: Original task prompt + outputLabel: Output filename + analysisPlan: Analysis plan + description: Output description + + Returns: + ChatContent object + """ + try: + # Determine format from filename + formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "csv" + + # Process data based on format + if formatType == "csv": + result = self._convertToCsv(datasets) + elif formatType == "json": + result = json.dumps(datasets, indent=2) + elif formatType == "xlsx": + result = self._convertToExcel(datasets) + else: + result = str(datasets) + + # Determine content type + contentType = "text/csv" if formatType == "csv" else \ + "application/json" if formatType == "json" else \ + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" if formatType == "xlsx" else \ + "text/plain" + + return self.formatAgentDocumentOutput(outputLabel, result, contentType) + + except Exception as e: + logger.error(f"Error creating data document: {str(e)}", exc_info=True) + + errorContent = f"Error generating {formatType} document: {str(e)}" + return self.formatAgentDocumentOutput(outputLabel, errorContent, "text/plain") + + async def _createTextDocument(self, datasets: Dict, context: str, prompt: str, + outputLabel: str, formatType: str, + analysisPlan: Dict, description: str) -> ChatContent: + """ + Create a text document (markdown, HTML, text) from analysis results. + + Args: + datasets: Dictionary of datasets + context: Document context + prompt: Original task prompt + outputLabel: Output filename + formatType: Output format + analysisPlan: Analysis plan + description: Output description + + Returns: + ChatContent object + """ + try: + # Generate dataset summaries + datasetSummaries = [] + for name, df in datasets.items(): + summary = f"\nDataset: {name}\n" + summary += f"Shape: {df.shape}\n" + summary += f"Columns: {', '.join(df.columns)}\n" + if not df.empty: + summary += f"Sample data:\n{df.head(3).to_string()}\n" + datasetSummaries.append(summary) + + # Generate analysis prompt + analysisPrompt = f""" + Create a detailed {formatType} document for: + + TASK: {prompt} + + OUTPUT REQUIREMENTS: + - Format: {formatType} + - Filename: {outputLabel} + - Description: {description} + + ANALYSIS CONTEXT: + {json.dumps(analysisPlan, indent=2)} + + DATASET SUMMARIES: + {"".join(datasetSummaries)} + + DOCUMENT CONTEXT: + {context[:2000]}... (truncated) + + Create a comprehensive, professional analysis document that addresses the task requirements. + The document should: + 1. Have a clear structure with headings and sections + 2. Include relevant data findings and insights + 3. Provide appropriate interpretations and recommendations + 4. Format the content according to the required output format + + Your response should be the complete document content in the specified format. + """ + + # Get document content from AI + documentContent = await self.service.base.callAi([ + {"role": "system", "content": f"You are a data analysis expert creating a {formatType} document."}, + {"role": "user", "content": analysisPrompt} + ], produceUserAnswer = True) + + # Clean HTML or Markdown if needed + if formatType in ["md", "markdown"] and not documentContent.strip().startswith("#"): + documentContent = f"# Analysis Report\n\n{documentContent}" + elif formatType == "html" and not "
{documentContent}" + + # Determine content type + contentType = "text/markdown" if formatType in ["md", "markdown"] else \ + "text/html" if formatType == "html" else \ + "text/plain" + + return self.formatAgentDocumentOutput(outputLabel, documentContent, contentType) + + except Exception as e: + logger.error(f"Error creating text document: {str(e)}", exc_info=True) + + # Create a simple error document + if formatType in ["md", "markdown"]: + content = f"# Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" + elif formatType == "html": + content = f"There was an error generating the analysis: {str(e)}
" + else: + content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" + + return self.formatAgentDocumentOutput(outputLabel, content, contentType) + + def _getImageBase64(self, formatType: str = 'png') -> str: + """ + Convert current matplotlib figure to base64 string. + + Args: + formatType: Image format + + Returns: + Base64 encoded string of the image + """ + buffer = io.BytesIO() + plt.savefig(buffer, format=formatType, dpi=100) + buffer.seek(0) + imageData = buffer.getvalue() + buffer.close() + + # Convert to base64 + return base64.b64encode(imageData).decode('utf-8') + + async def _analyzeData(self, task: Dict[str, Any], analysisPlan: Dict[str, Any]) -> Dict[str, Any]: + """ + Analyze data based on the analysis plan. + + Args: + task: Task dictionary with input documents and specifications + analysisPlan: Analysis plan from _createAnalysisPlan + + Returns: + Analysis results dictionary + """ + try: + # Extract data from input documents + inputDocuments = task.get("inputDocuments", []) + datasets, documentContext = self._extractData(inputDocuments) + + # Get task information + prompt = task.get("prompt", "") + outputSpecs = task.get("outputSpecifications", []) + + # Analyze task requirements + analysisResults = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs) + + # Add datasets and context to results + analysisResults["datasets"] = datasets + analysisResults["documentContext"] = documentContext + + return analysisResults + + except Exception as e: + logger.error(f"Error analyzing data: {str(e)}", exc_info=True) + return { + "error": str(e), + "datasets": {}, + "documentContext": "" + } + + async def _createOutputDocuments(self, prompt: str, analysisResults: Dict[str, Any], + outputSpecs: List[Dict[str, Any]], analysisPlan: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Create output documents based on analysis results. + + Args: + prompt: Original task prompt + analysisResults: Results from data analysis + outputSpecs: List of output specifications + analysisPlan: Analysis plan from _createAnalysisPlan + + Returns: + List of document objects + """ + documents = [] + datasets = analysisResults.get("datasets", {}) + documentContext = analysisResults.get("documentContext", "") + + # Process each output specification + for spec in outputSpecs: + outputLabel = spec.get("label", "") + outputDescription = spec.get("description", "") + + # Determine format from filename + formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt" + + try: + # Create appropriate document based on format + if formatType in ["png", "jpg", "jpeg", "svg"]: + # Visualization output + document = await self._createVisualization( + datasets, prompt, outputLabel, analysisPlan, outputDescription + ) + elif formatType in ["csv", "json", "xlsx"]: + # Data document output + document = await self._createDataDocument( + datasets, prompt, outputLabel, analysisPlan, outputDescription + ) + else: + # Text document output (markdown, html, text) + document = await self._createTextDocument( + datasets, documentContext, prompt, outputLabel, formatType, + analysisPlan, outputDescription + ) + + documents.append(document) + + except Exception as e: + logger.error(f"Error creating output document {outputLabel}: {str(e)}", exc_info=True) + # Create error document + errorDoc = self.formatAgentDocumentOutput( + outputLabel, + f"Error creating document: {str(e)}", + "text/plain" + ) + documents.append(errorDoc) + + return documents + + +# Factory function for the Analyst agent +def getAgentAnalyst(): + """Returns an instance of the Analyst agent.""" + return AgentAnalyst() \ No newline at end of file diff --git a/modules/interfaces/serviceChatClass.py b/modules/interfaces/serviceChatClass.py index dc7c823c..509eeb55 100644 --- a/modules/interfaces/serviceChatClass.py +++ b/modules/interfaces/serviceChatClass.py @@ -10,6 +10,7 @@ from datetime import datetime from typing import Dict, Any, List, Optional, Union import hashlib +import asyncio from modules.shared.mimeUtils import isTextMimeType from modules.interfaces.serviceChatAccess import ChatAccess @@ -842,6 +843,168 @@ class ChatInterface: logger.error(f"Error loading workflow state: {str(e)}") return None + # Workflow Actions + + async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: + """ + Starts a new workflow or continues an existing one. + Corresponds to State 1 in the state machine documentation. + + Args: + userInput: The user input request containing workflow initialization data + workflowId: Optional ID of an existing workflow to continue + + Returns: + ChatWorkflow object representing the started/continued workflow + """ + try: + # Get current timestamp + currentTime = self._getCurrentTimestamp() + + if workflowId: + # Continue existing workflow + workflow = self.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Workflow {workflowId} not found") + + # Update workflow status + workflow.status = "running" + workflow.lastActivity = currentTime + + # Update in database + self.updateWorkflow(workflowId, { + "status": "running", + "lastActivity": currentTime + }) + + # Add log entry + self.createWorkflowLog({ + "workflowId": workflowId, + "message": "Workflow continued", + "type": "info", + "status": "running", + "progress": 0 + }) + + else: + # Create new workflow + workflowData = { + "name": userInput.name or "New Workflow", + "status": "running", + "startedAt": currentTime, + "lastActivity": currentTime, + "currentRound": 1, + "mandateId": self.mandateId, + "messageIds": [], + "dataStats": { + "totalMessages": 0, + "totalDocuments": 0, + "totalTokens": 0 + } + } + + # Create workflow + workflow = self.createWorkflow(workflowData) + + # Add log entry + self.createWorkflowLog({ + "workflowId": workflow.id, + "message": "Workflow started", + "type": "info", + "status": "running", + "progress": 0 + }) + + # Start workflow processing + from modules.workflow.workflowManager import getWorkflowManager + workflowManager = await getWorkflowManager(self) + asyncio.create_task(workflowManager.workflowProcess(userInput, workflow)) + + return workflow + + except Exception as e: + logger.error(f"Error starting workflow: {str(e)}") + raise + + async def workflowStop(self, workflowId: str) -> ChatWorkflow: + """ + Stops a running workflow (State 8: Workflow Stopped). + + Args: + workflowId: ID of the workflow to stop + + Returns: + Updated ChatWorkflow object + """ + try: + # Load workflow state + workflow = self.getWorkflow(workflowId) + if not workflow: + raise ValueError(f"Workflow {workflowId} not found") + + # Update workflow status + workflow.status = "stopped" + workflow.lastActivity = self._getCurrentTimestamp() + + # Update in database + self.updateWorkflow(workflowId, { + "status": "stopped", + "lastActivity": workflow.lastActivity + }) + + # Add log entry + self.createWorkflowLog({ + "workflowId": workflowId, + "message": "Workflow stopped", + "type": "warning", + "status": "stopped", + "progress": 100 + }) + + return workflow + + except Exception as e: + logger.error(f"Error stopping workflow: {str(e)}") + raise + + async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: + """ + Process multiple files and extract their contents. + + Args: + fileIds: List of file IDs to process + + Returns: + List of ChatDocument objects + """ + documents = [] + for fileId in fileIds: + try: + # Get file content + fileContent = self.service.functions.getFileData(fileId) + if not fileContent: + continue + + # Get file metadata + fileMetadata = self.service.functions.getFile(fileId) + if not fileMetadata: + continue + + # Create ChatDocument + document = ChatDocument( + id=str(uuid.uuid4()), + fileId=fileId, + filename=fileMetadata.get("name", "Unknown"), + fileSize=fileMetadata.get("size", 0), + content=fileContent.decode('utf-8', errors='ignore'), + mimeType=fileMetadata.get("mimeType", "text/plain") + ) + documents.append(document) + except Exception as e: + logger.error(f"Error processing file {fileId}: {str(e)}") + continue + return documents + def getInterface(currentUser: Optional[User] = None) -> 'ChatInterface': """ diff --git a/modules/interfaces/serviceChatModel.py b/modules/interfaces/serviceChatModel.py index a37d0909..db01cd62 100644 --- a/modules/interfaces/serviceChatModel.py +++ b/modules/interfaces/serviceChatModel.py @@ -319,4 +319,50 @@ register_model_labels( "lastActive": {"en": "Last Active", "fr": "Dernière activité"}, "stats": {"en": "Statistics", "fr": "Statistiques"} } +) + +class AgentHandover(BaseModel, ModelMixin): + """Data model for agent handover information.""" + # Status values + status: str = Field(default="pending", description="One of: pending, success, failed, retry") + error: Optional[str] = Field(None, description="Error message if any") + progress: float = Field(default=0.0, description="Progress percentage") + + # Document information + documentsUserInitial: List[Dict[str, Any]] = Field(default_factory=list, description="Initial user documents") + documentsInput: List[Dict[str, Any]] = Field(default_factory=list, description="Input documents") + documentsOutput: List[Dict[str, Any]] = Field(default_factory=list, description="Output documents") + + # Prompt information + promptUserInitial: str = Field(default="", description="Initial user prompt") + promptFromFinishedAgent: str = Field(default="", description="Prompt from finished agent") + promptForNextAgent: str = Field(default="", description="Prompt for next agent") + + # Agent information + currentAgent: Optional[str] = Field(None, description="Current agent name") + nextAgent: Optional[str] = Field(None, description="Next agent name") + + # Timing information + startedAt: Optional[str] = Field(None, description="Start timestamp") + finishedAt: Optional[str] = Field(None, description="Finish timestamp") + +# Register labels for AgentHandover +register_model_labels( + "AgentHandover", + {"en": "Agent Handover", "fr": "Transfert d'agent"}, + { + "status": {"en": "Status", "fr": "Statut"}, + "error": {"en": "Error", "fr": "Erreur"}, + "progress": {"en": "Progress", "fr": "Progression"}, + "documentsUserInitial": {"en": "Initial User Documents", "fr": "Documents utilisateur initiaux"}, + "documentsInput": {"en": "Input Documents", "fr": "Documents d'entrée"}, + "documentsOutput": {"en": "Output Documents", "fr": "Documents de sortie"}, + "promptUserInitial": {"en": "Initial User Prompt", "fr": "Invite utilisateur initiale"}, + "promptFromFinishedAgent": {"en": "Finished Agent Prompt", "fr": "Invite de l'agent terminé"}, + "promptForNextAgent": {"en": "Next Agent Prompt", "fr": "Invite pour le prochain agent"}, + "currentAgent": {"en": "Current Agent", "fr": "Agent actuel"}, + "nextAgent": {"en": "Next Agent", "fr": "Prochain agent"}, + "startedAt": {"en": "Started At", "fr": "Démarré le"}, + "finishedAt": {"en": "Finished At", "fr": "Terminé le"} + } ) \ No newline at end of file diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index bd97f855..81e6e123 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -59,132 +59,51 @@ def createServiceContainer(currentUser: Dict[str, Any]): return service -# API Endpoint for getting all workflows -@router.get("/list", response_model=List[ChatWorkflow]) +# Consolidated endpoint for getting all workflows +@router.get("/", response_model=List[ChatWorkflow]) @limiter.limit("30/minute") -async def list_workflows( +async def get_workflows( request: Request, currentUser: User = Depends(getCurrentUser) ) -> List[ChatWorkflow]: - """List all workflows for the current user.""" + """Get all workflows for the current user.""" try: appInterface = getInterface(currentUser) return appInterface.getAllWorkflows() except Exception as e: - logger.error(f"Error listing workflows: {str(e)}") + logger.error(f"Error getting workflows: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to list workflows: {str(e)}" + detail=f"Failed to get workflows: {str(e)}" ) -# State 1: Workflow Initialization endpoint -@router.post("/start", response_model=ChatWorkflow) -@limiter.limit("10/minute") -async def start_workflow( +@router.get("/{workflowId}", response_model=ChatWorkflow) +@limiter.limit("30/minute") +async def get_workflow( request: Request, - workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"), - userInput: UserInputRequest = Body(...), + workflowId: str = Path(..., description="ID of the workflow"), currentUser: User = Depends(getCurrentUser) ) -> ChatWorkflow: - """ - Starts a new workflow or continues an existing one. - Corresponds to State 1 in the state machine documentation. - """ + """Get workflow by ID""" try: - # Get service container - service = createServiceContainer(currentUser) + # Get workflow interface with current user context + workflowInterface = getInterface(currentUser) - # Get workflow manager - workflowManager = await getWorkflowManager(service) - - # Start or continue workflow - workflow = await workflowManager.workflowStart(userInput, workflowId) - - return ChatWorkflow(**workflow) - - except Exception as e: - logger.error(f"Error in start_workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -# State 8: Workflow Stopped endpoint -@router.post("/{workflowId}/stop", response_model=ChatWorkflow) -@limiter.limit("10/minute") -async def stop_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to stop"), - currentUser: User = Depends(getCurrentUser) -) -> ChatWorkflow: - """Stops a running workflow.""" - try: - # Get service container - service = createServiceContainer(currentUser) - - # Get workflow manager - workflowManager = await getWorkflowManager(service) - - # Stop workflow - workflow = await workflowManager.workflowStop(workflowId) - - return ChatWorkflow(**workflow) - - except Exception as e: - logger.error(f"Error in stop_workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=str(e) - ) - -# State 11: Workflow Reset/Deletion endpoint -@router.delete("/{workflowId}", response_model=Dict[str, Any]) -@limiter.limit("10/minute") -async def delete_workflow( - request: Request, - workflowId: str = Path(..., description="ID of the workflow to delete"), - currentUser: User = Depends(getCurrentUser) -) -> Dict[str, Any]: - """Deletes a workflow and its associated data.""" - try: - # Get service container - service = createServiceContainer(currentUser) - - # Verify workflow exists - workflow = service.base.getWorkflow(workflowId) + # Get workflow + workflow = workflowInterface.getWorkflow(workflowId) if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, - detail=f"Workflow with ID {workflowId} not found" + detail="Workflow not found" ) + + return workflow - # Check if user has permission to delete - if workflow.get("_userId") != currentUser["id"]: - raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to delete this workflow" - ) - - # Delete workflow - success = service.base.deleteWorkflow(workflowId) - - if not success: - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to delete workflow" - ) - - return { - "id": workflowId, - "message": "Workflow and associated data deleted successfully" - } - except HTTPException: - raise except Exception as e: - logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) + logger.error(f"Error getting workflow: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Error deleting workflow: {str(e)}" + detail=f"Failed to get workflow: {str(e)}" ) # API Endpoint for workflow status @@ -304,6 +223,111 @@ async def get_workflow_messages( detail=f"Error getting workflow messages: {str(e)}" ) +# State 1: Workflow Initialization endpoint +@router.post("/start", response_model=ChatWorkflow) +@limiter.limit("10/minute") +async def start_workflow( + request: Request, + workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"), + userInput: UserInputRequest = Body(...), + currentUser: User = Depends(getCurrentUser) +) -> ChatWorkflow: + """ + Starts a new workflow or continues an existing one. + Corresponds to State 1 in the state machine documentation. + """ + try: + # Get service container + service = createServiceContainer(currentUser) + + # Start or continue workflow using ChatInterface + workflow = await service.functions.workflowStart(userInput, workflowId) + + return ChatWorkflow(**workflow) + + except Exception as e: + logger.error(f"Error in start_workflow: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e) + ) + +# State 8: Workflow Stopped endpoint +@router.post("/{workflowId}/stop", response_model=ChatWorkflow) +@limiter.limit("10/minute") +async def stop_workflow( + request: Request, + workflowId: str = Path(..., description="ID of the workflow to stop"), + currentUser: User = Depends(getCurrentUser) +) -> ChatWorkflow: + """Stops a running workflow.""" + try: + # Get service container + service = createServiceContainer(currentUser) + + # Stop workflow using ChatInterface + workflow = await service.functions.workflowStop(workflowId) + + return ChatWorkflow(**workflow) + + except Exception as e: + logger.error(f"Error in stop_workflow: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(e) + ) + +# State 11: Workflow Reset/Deletion endpoint +@router.delete("/{workflowId}", response_model=Dict[str, Any]) +@limiter.limit("10/minute") +async def delete_workflow( + request: Request, + workflowId: str = Path(..., description="ID of the workflow to delete"), + currentUser: User = Depends(getCurrentUser) +) -> Dict[str, Any]: + """Deletes a workflow and its associated data.""" + try: + # Get service container + service = createServiceContainer(currentUser) + + # Verify workflow exists + workflow = service.base.getWorkflow(workflowId) + if not workflow: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow with ID {workflowId} not found" + ) + + # Check if user has permission to delete + if workflow.get("_userId") != currentUser["id"]: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="You don't have permission to delete this workflow" + ) + + # Delete workflow + success = service.base.deleteWorkflow(workflowId) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Failed to delete workflow" + ) + + return { + "id": workflowId, + "message": "Workflow and associated data deleted successfully" + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting workflow: {str(e)}", exc_info=True) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Error deleting workflow: {str(e)}" + ) + + # Document Management Endpoints @router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any]) @@ -401,54 +425,3 @@ async def delete_file_from_message( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error deleting file reference: {str(e)}" ) - -@router.get("/workflows", response_model=List[ChatWorkflow]) -@limiter.limit("30/minute") -async def get_workflows( - request: Request, - currentUser: User = Depends(getCurrentUser) -) -> List[ChatWorkflow]: - """Get all workflows for current user""" - try: - # Get workflow interface with current user context - workflowInterface = getInterface(currentUser) - - # Get workflows - workflows = workflowInterface.getWorkflows() - return workflows - - except Exception as e: - logger.error(f"Error getting workflows: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get workflows: {str(e)}" - ) - -@router.get("/workflows/{workflow_id}", response_model=ChatWorkflow) -@limiter.limit("30/minute") -async def get_workflow( - request: Request, - workflow_id: str, - currentUser: User = Depends(getCurrentUser) -) -> ChatWorkflow: - """Get workflow by ID""" - try: - # Get workflow interface with current user context - workflowInterface = getInterface(currentUser) - - # Get workflow - workflow = workflowInterface.getWorkflow(workflow_id) - if not workflow: - raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="Workflow not found" - ) - - return workflow - - except Exception as e: - logger.error(f"Error getting workflow: {str(e)}") - raise HTTPException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f"Failed to get workflow: {str(e)}" - ) diff --git a/modules/workflow/agentManager.py b/modules/workflow/agentManager.py index fba60ac5..91bae403 100644 --- a/modules/workflow/agentManager.py +++ b/modules/workflow/agentManager.py @@ -1,22 +1,20 @@ """ -Agent Manager Module for managing, initializing, and executing agents. +Agent Manager Module for managing agent operations and execution. """ import os import logging import importlib -import asyncio from typing import Dict, Any, List, Optional, Tuple from datetime import datetime, UTC -from modules.workflow.agentBase import AgentBase -from modules.interfaces.serviceChatModel import AgentResponse, Task, ChatMessage import uuid -from modules.workflow.taskManager import getTaskManager - +from modules.interfaces.serviceChatModel import ( + ChatMessage, ChatDocument, UserInputRequest, ChatWorkflow, AgentResponse +) logger = logging.getLogger(__name__) class AgentManager: - """Central manager for all agents in the system, handling registration, initialization, and execution.""" + """Manager for agent operations and execution.""" _instance = None @@ -26,58 +24,36 @@ class AgentManager: if cls._instance is None: cls._instance = cls() return cls._instance - + + # Internal Methods + def __init__(self): """Initialize the agent manager.""" if AgentManager._instance is not None: raise RuntimeError("Singleton instance already exists - use getInstance()") - self.agents: Dict[str, AgentBase] = {} self.service = None - self.taskManager = getTaskManager() - self._loadAgents() - - def initialize(self, service=None): - """Initialize or update the manager with service references.""" - if service: - # Validate required interfaces - required_interfaces = ['base', 'msft', 'google'] - missing_interfaces = [] - for interface in required_interfaces: - if not hasattr(service, interface): - missing_interfaces.append(interface) - - if missing_interfaces: - logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}") - return False - - self.service = service - - # Initialize agents with service - for agent in self.agents.values(): - if service and hasattr(agent, 'setService'): - agent.setService(service) - - return True + self.agents = {} # Dictionary to store agent instances + self._loadAgents() # Load agents on initialization def _loadAgents(self): - """Load all available agents from modules.""" + """Load all available agents from modules dynamically.""" logger.info("Loading agent modules...") - # List of agent modules to load - agentModules = [] + # Get the agents directory path agentDir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "agents") - # Search the directory for agent modules + # Search for agent modules + agentModules = [] for filename in os.listdir(agentDir): if filename.startswith("agent") and filename.endswith(".py"): - agentModules.append(filename[0:-3]) # Remove .py extension + agentModules.append(filename[:-3]) # Remove .py extension if not agentModules: - logger.warning("No agent modules found") + logger.warning("No agent modules found in directory: %s", agentDir) return - logger.info(f"{len(agentModules)} agent modules found") + logger.info(f"Found {len(agentModules)} agent modules: {', '.join(agentModules)}") # Load each agent module for moduleName in agentModules: @@ -85,63 +61,81 @@ class AgentManager: # Import the module module = importlib.import_module(f"modules.agents.{moduleName}") - # Look for agent class or get_*_agent function + # Extract agent name from module name agentName = moduleName.split("agent")[-1] className = f"Agent{agentName}" getterName = f"getAgent{agentName}" agent = None - # Try to get the agent via the get*Agent function + # Try to get the agent via the getter function first if hasattr(module, getterName): getterFunc = getattr(module, getterName) agent = getterFunc() logger.info(f"Agent '{agent.name}' loaded via {getterName}()") - # Alternatively, try to instantiate the agent directly + # If no getter, try to instantiate the agent class directly elif hasattr(module, className): agentClass = getattr(module, className) agent = agentClass() - logger.info(f"Agent '{agent.name}' directly instantiated") + logger.info(f"Agent '{agent.name}' directly instantiated from {className}") if agent: # Register the agent - self.registerAgent(agent) + if self._registerAgent(agent): + logger.info(f"Successfully registered agent: {agent.name}") + else: + logger.error(f"Failed to register agent from module: {moduleName}") else: - logger.warning(f"No agent class or getter function found in module {moduleName}") + logger.warning(f"No agent class or getter function found in module: {moduleName}") except ImportError as e: - logger.error(f"Module {moduleName} could not be imported: {e}") + logger.error(f"Failed to import module {moduleName}: {str(e)}") except Exception as e: - logger.error(f"Error loading agent from module {moduleName}: {e}") - - def registerAgent(self, agent: AgentBase): + logger.error(f"Error loading agent from module {moduleName}: {str(e)}") + + def _registerAgent(self, agent: Any): + """Register a new agent with the manager.""" + if not hasattr(agent, 'name'): + logger.error("Agent must have a name attribute") + return False + + self.agents[agent.name] = agent + if self.service and hasattr(agent, 'setService'): + agent.setService(self.service) + + return True + + # Public Methods + + def initialize(self, service: Any): + """Initialize the manager with service reference.""" + # Store service reference + self.service = service + + # Initialize agents with service + for agent in self.agents.values(): + if hasattr(agent, 'setService'): + agent.setService(service) + + return True + + def getAgent(self, agentIdentifier: str) -> Optional[Any]: """ - Register an agent in the manager. + Get an agent instance by its identifier. Args: - agent: The agent to register - """ - agentId = getattr(agent, 'name', "unknown_agent") - self.agents[agentId] = agent - logger.debug(f"Agent '{agent.name}' registered") - - def getAgent(self, agentIdentifier: str) -> Optional[AgentBase]: - """ - Return an agent instance. - - Args: - agentIdentifier: ID or type of the desired agent + agentIdentifier: Name or identifier of the agent Returns: - Agent instance or None if not found + Agent instance if found, None otherwise """ - if agentIdentifier in self.agents: - return self.agents[agentIdentifier] - logger.error(f"Agent with identifier '{agentIdentifier}' not found") - return None + agent = self.agents.get(agentIdentifier) + if not agent: + logger.warning(f"Agent '{agentIdentifier}' not found") + return agent - def getAllAgents(self) -> Dict[str, AgentBase]: + def getAllAgents(self) -> Dict[str, Any]: """ Get all registered agents. @@ -151,120 +145,67 @@ class AgentManager: return self.agents.copy() def getAgentInfos(self) -> List[Dict[str, Any]]: - """Return information about all registered agents.""" - agentInfos = [] - seenAgents = set() - for agent in self.agents.values(): - if agent not in seenAgents: - agentInfos.append(agent.getAgentInfo()) - seenAgents.add(agent) - return agentInfos - - async def executeAgent(self, task: Task) -> Tuple[AgentResponse, Task]: + """Get information about all registered agents.""" + return [ + { + 'name': agent.name, + 'description': getattr(agent, 'description', ''), + 'capabilities': getattr(agent, 'capabilities', []), + 'inputTypes': getattr(agent, 'inputTypes', []), + 'outputTypes': getattr(agent, 'outputTypes', []) + } + for agent in self.agents.values() + ] + + async def executeAgent(self, handover: Any) -> AgentResponse: """ - Execute an agent for a given task. + Execute an agent with the given handover. Args: - task: The task to execute + handover: Handover object containing agent execution context Returns: - Tuple of (AgentResponse, updated Task) + AgentResponse object with execution results """ - agent = self.getAgent(task.agentName) - if not agent: - error_msg = f"Agent '{task.agentName}' not found" - logger.error(error_msg) - return ( - AgentResponse( - success=False, - message=ChatMessage( - id=str(uuid.uuid4()), - workflowId=task.workflowId, - agentName=task.agentName, - message=error_msg, - role="system", - status="error", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=False - ), - performance={}, - progress=0.0 - ), - Task(**{**task.to_dict(), "status": "failed", "error": error_msg}) - ) - try: - # Update task status - task = self.taskManager.updateTaskStatus(task, "running") - task.startedAt = datetime.now(UTC).isoformat() + # Get agent instance + agent = self.agents.get(handover.currentAgent) + if not agent: + raise ValueError(f"Agent {handover.currentAgent} not found") # Execute agent - startTime = datetime.now(UTC) - response = await agent.execute(task) - endTime = datetime.now(UTC) + response = await agent.execute(handover) - # Calculate performance metrics - duration = (endTime - startTime).total_seconds() - performance = { - "duration": duration, - "startTime": startTime.isoformat(), - "endTime": endTime.isoformat() - } + # Save output files if any + if response.message and response.message.documents: + self.service.document['agentOutputFilesSave'](handover, response.message.documents) - # Update task with result - task.status = "completed" if response.success else "failed" - task.finishedAt = endTime.isoformat() - task.result = response.message - task.progress = response.progress - task.performance = performance - - if not response.success: - task.error = response.message.message if response.message else "Unknown error" - - # Create response - response = AgentResponse( - success=response.success, - message=response.message, - performance=performance - ) - - # Update task status - if response.success: - task = self.taskManager.completeTask(task, response.message) - else: - task = self.taskManager.handleTaskError(task, response.message.message if response.message else "Unknown error") - - return response, task + return response except Exception as e: - error_msg = f"Error executing agent '{task.agentName}': {str(e)}" - logger.error(error_msg, exc_info=True) + logger.error(f"Error executing agent {handover.currentAgent}: {str(e)}") - # Create error response - error_response = AgentResponse( + # Create error message + errorMessage = ChatMessage( + id=str(uuid.uuid4()), + workflowId=handover.workflowId, + agentName=handover.currentAgent, + message=f"Error executing agent: {str(e)}", + role="system", + status="error", + sequenceNr=0, + startedAt=handover.startedAt, + finishedAt=datetime.now(UTC).isoformat(), + success=False + ) + + return AgentResponse( success=False, - message=ChatMessage( - id=str(uuid.uuid4()), - workflowId=task.workflowId, - agentName=task.agentName, - message=error_msg, - role="system", - status="error", - sequenceNr=0, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=False - ), + message=errorMessage, + error=str(e), performance={}, progress=0.0 ) - - # Update task with error - task = self.taskManager.handleTaskError(task, error_msg) - - return error_response, task # Singleton factory for the agent manager def getAgentManager(): diff --git a/modules/workflow/chatManager.py b/modules/workflow/chatManager.py new file mode 100644 index 00000000..bb3e1262 --- /dev/null +++ b/modules/workflow/chatManager.py @@ -0,0 +1,617 @@ +""" +Chat Manager Module for managing chat workflows and agent handovers. +""" + +import logging +from typing import Dict, Any, List, Optional, Union +from datetime import datetime, UTC +import uuid +import json +from dataclasses import dataclass +from modules.interfaces.serviceChatModel import ( + ChatLog, ChatMessage, ChatDocument, UserInputRequest, ChatWorkflow, + AgentHandover +) +from modules.workflow.agentManager import getAgentManager +from modules.workflow.documentManager import getDocumentManager + +logger = logging.getLogger(__name__) + +class ChatManager: + """Manager for chat workflows and agent handovers.""" + + _instance = None + + @classmethod + def getInstance(cls): + """Return a singleton instance of the chat manager.""" + if cls._instance is None: + cls._instance = cls() + return cls._instance + + # Core functions + + def __init__(self): + """Initialize the chat manager.""" + if ChatManager._instance is not None: + raise RuntimeError("Singleton instance already exists - use getInstance()") + + self.service = None + self.agentManager = getAgentManager() + self.documentManager = getDocumentManager() + + def initialize(self, workflow: ChatWorkflow): + """ + Initialize the manager with an optional workflow object. + + Args: + workflow: Optional ChatWorkflow object to initialize with + """ + # Initialize managers + self.agentManager.initialize(self.service) + self.documentManager.initialize(self.service) + + # Add basic references to service + self.service.workflow = workflow + self.service.logAdd = self.logAdd + + self.service.user = { + 'id': None, + 'name': None, + 'language': 'en' + } + self.service.functions = { + 'forEach': lambda items, action: [action(item) for item in items], + 'while': lambda condition, action: [action() for _ in iter(lambda: condition(), False)] + } + self.service.model = { + 'callAiBasic': self._callAiBasic, + 'callAiComplex': self._callAiComplex, + 'callAiImage': self._callAiImage + } + + # Initialize document operations + self.service.document = { + 'extract': self.documentManager.extractContent, + 'convertFileRefToFileId': self.documentManager.convertFileRefToId, + 'convertFileIdToFileRef': self.documentManager.convertFileIdToRef, + 'convertDataFormat': self.documentManager.convertDataFormat, + 'agentInputFilesCreate': self.documentManager.createAgentInputFileList, + 'agentOutputFilesSave': self.documentManager.saveAgentOutputFiles + } + + # Initialize data access + from modules.workflow.dataAccessFunctions import get_data_access + self.service.data = get_data_access().to_service_object() + + return True + + def createInitialHandover(self, userInput: UserInputRequest) -> AgentHandover: + """ + Create the initial handover object from user input. + + Args: + userInput: User input request + + Returns: + Initial handover object + """ + try: + # Create initial handover + handover = AgentHandover( + promptUserInitial=userInput.message, + documentsUserInitial=userInput.listFileId or [], + startedAt=datetime.now(UTC).isoformat() + ) + + # Process user input documents + if handover.documentsUserInitial: + handover.documentsInput = handover.documentsUserInitial + + # Set initial prompt for next agent + handover.promptForNextAgent = handover.promptUserInitial + + return handover + + except Exception as e: + logger.error(f"Error creating initial handover: {str(e)}") + return AgentHandover(status="failed", error=str(e)) + + async def defineNextHandover(self, currentHandover: AgentHandover) -> Optional[AgentHandover]: + """ + Define the next handover object for agent transition. + + Args: + currentHandover: Current handover object + + Returns: + Next handover object or None if no next agent + """ + try: + # Get available agents + availableAgents = self.agentManager.getAgentInfos() + if not availableAgents: + logger.warning("No available agents found") + return None + + # Create next handover object + nextHandover = AgentHandover( + promptUserInitial=currentHandover.promptUserInitial, + documentsUserInitial=currentHandover.documentsUserInitial, + startedAt=datetime.now(UTC).isoformat() + ) + + # If this is the first handover, use initial documents + if not currentHandover.promptFromFinishedAgent: + nextHandover.documentsInput = currentHandover.documentsUserInitial + nextHandover.promptForNextAgent = currentHandover.promptUserInitial + else: + # Use output documents from previous agent + nextHandover.documentsInput = currentHandover.documentsOutput + nextHandover.promptForNextAgent = currentHandover.promptFromFinishedAgent + + # Select next agent based on available agents and current state + nextAgent = await self._selectNextAgent(availableAgents, nextHandover) + if not nextAgent: + logger.info("No suitable next agent found") + return None + + nextHandover.nextAgent = nextAgent['name'] + return nextHandover + + except Exception as e: + logger.error(f"Error defining next handover: {str(e)}") + return None + + async def _selectNextAgent(self, availableAgents: List[Dict[str, Any]], handover: AgentHandover) -> Optional[Dict[str, Any]]: + """ + Select the next agent using AI analysis of the current state and requirements. + + Args: + availableAgents: List of available agents + handover: Current handover object + + Returns: + Selected agent or None if no suitable agent + """ + try: + if not availableAgents: + logger.warning("No available agents found") + return None + + # Get current workflow state + workflow = self.service.workflow + if not workflow: + logger.error("No workflow context available") + return None + + # Detect user language if not already set + if not workflow.userLanguage: + workflow.userLanguage = await self._detectUserLanguage(handover.promptUserInitial) + + # Get workflow summary for context + workflow_summary = await self.workflowSummarize(ChatMessage( + id=str(uuid.uuid4()), + workflowId=workflow.id, + role="user", + message=handover.promptUserInitial + )) + + # Prepare context for AI analysis + context = { + "current_state": { + "previous_agent": handover.currentAgent, + "status": handover.status, + "error": handover.error, + "user_language": workflow.userLanguage, + "input_documents": handover.documentsInput or [], + "output_documents": handover.documentsOutput or [], + "required_capabilities": handover.requiredCapabilities or [] + }, + "conversation_history": workflow_summary, + "available_agents": [ + { + "name": agent.get("name", ""), + "capabilities": agent.get("capabilities", {}), + "description": agent.get("description", "") + } + for agent in availableAgents + ] + } + + # Create prompt for AI to analyze and select next agent + prompt = f""" + Analyze the current workflow state, conversation history, and available agents to determine the most suitable next agent. + Consider the following factors: + 1. Previous agent's status and any errors + 2. Required capabilities for the task + 3. Document type compatibility + 4. Language requirements + 5. Agent's capabilities and specializations + 6. Conversation history and context + + Current State: + {json.dumps(context['current_state'], indent=2)} + + Conversation History: + {context['conversation_history']} + + Available Agents: + {json.dumps(context['available_agents'], indent=2)} + + Return a JSON object with the following structure: + {{ + "selected_agent": "name of the most suitable agent", + "reasoning": "brief explanation of why this agent was selected", + "required_capabilities": ["list", "of", "required", "capabilities"], + "potential_risks": ["list", "of", "potential", "issues"], + "task": {{ + "description": "clear description of what the agent needs to do", + "input_format": {{ + "documents": ["list", "of", "required", "input", "documents"], + "data": ["list", "of", "required", "data", "fields"] + }}, + "output_format": {{ + "documents": ["list", "of", "expected", "output", "documents"], + "data": ["list", "of", "expected", "output", "fields"] + }}, + "requirements": [ + "list of specific requirements", + "format requirements", + "quality requirements" + ], + "constraints": [ + "list of constraints", + "time limits", + "resource limits" + ] + }}, + "prompt_template": "template for the agent's prompt with placeholders for dynamic content" + }} + + Format your response as a valid JSON object. + """ + + # Get AI's analysis and selection + response = await self._callAiComplex(prompt) + + try: + analysis = json.loads(response) + selected_agent_name = analysis.get('selected_agent') + + # Find the selected agent in available agents + selected_agent = next( + (agent for agent in availableAgents if agent.get('name') == selected_agent_name), + None + ) + + if selected_agent: + logger.info(f"AI selected agent {selected_agent_name}: {analysis.get('reasoning')}") + # Update handover with AI's analysis + handover.requiredCapabilities = analysis.get('required_capabilities', []) + handover.analysis = { + 'reasoning': analysis.get('reasoning'), + 'potential_risks': analysis.get('potential_risks', []), + 'task': analysis.get('task', {}), + 'prompt_template': analysis.get('prompt_template', '') + } + return selected_agent + else: + logger.warning(f"AI selected agent {selected_agent_name} not found in available agents") + return None + + except json.JSONDecodeError as e: + logger.error(f"Error parsing AI response: {str(e)}") + return None + + except Exception as e: + logger.error(f"Error selecting next agent: {str(e)}") + return None + + async def processNextAgent(self, handover: AgentHandover) -> AgentHandover: + """ + Process the next agent in the workflow. + + Args: + handover: Current handover object + + Returns: + Updated handover object + """ + try: + # Get agent instance + agent = self.agentManager.getAgent(handover.nextAgent) + if not agent: + handover.update_status("failed", f"Agent {handover.nextAgent} not found") + return handover + + # Set current agent + handover.currentAgent = handover.nextAgent + handover.nextAgent = None + + # Execute agent + response = await agent.execute(handover) + + # Update handover with results + if response.success: + handover.update_status("success") + handover.documentsOutput = response.message.documents if response.message else [] + handover.promptFromFinishedAgent = response.message.message if response.message else "" + else: + handover.update_status("failed", response.error) + + return handover + + except Exception as e: + logger.error(f"Error processing next agent: {str(e)}") + handover.update_status("failed", str(e)) + return handover + + # Agent functions + + async def _callAiBasic(self, prompt: str, context: Dict[str, Any] = None) -> str: + """Call basic AI model.""" + try: + response = await self.service.base.callAi(prompt, context or {}, model="aiBase") + return response + except Exception as e: + logger.error(f"Error calling basic AI: {str(e)}") + return "" + + async def _callAiComplex(self, prompt: str, context: Dict[str, Any] = None) -> str: + """Call complex AI model.""" + try: + response = await self.service.base.callAi(prompt, context or {}, model="aiComplex") + return response + except Exception as e: + logger.error(f"Error calling complex AI: {str(e)}") + return "" + + async def _callAiImage(self, prompt: str, context: Dict[str, Any] = None) -> str: + """Call image AI model.""" + try: + response = await self.service.base.callAi(prompt, context or {}, model="aiImage") + return response + except Exception as e: + logger.error(f"Error calling image AI: {str(e)}") + return "" + + def logAdd(self, message: str, level: str = "info", + progress: Optional[int] = None) -> str: + """ + Add a log entry to the workflow. + + Args: + message: Log message + level: Log level (info, warning, error) + progress: Optional progress percentage + + Returns: + str: ID of the created log entry + """ + workflow = self.service.workflow + try: + # Generate log ID + logId = str(uuid.uuid4()) + + # Create log entry + logEntry = ChatLog( + id=logId, + workflowId=workflow.id, + message=message, + level=level, + progress=progress, + timestamp=datetime.now().isoformat() + ) + + # Add to workflow logs + workflow.logs.append(logEntry) + + # Also log to Python logger + logLevel = getattr(logging, level.upper()) + logger.log(logLevel, f"[Workflow {workflow.id}] {message}") + + # Save to database + self.chatManager.saveWorkflowLog(workflow.id, logEntry.to_dict()) + + return logId + + except Exception as e: + logger.error(f"Error adding log entry: {str(e)}") + return "" + + async def chatMessageToWorkflow(self, role: str, agent: Union[str, Dict[str, Any]], chatMessage: UserInputRequest) -> ChatMessage: + """ + Integrates chat message input into a Message object including files with complete contents. + + Args: + role: Role of the message sender (e.g., 'user', 'assistant') + agent: Agent name or configuration + chatMessage: UserInputRequest object containing message data and file references + + Returns: + ChatMessage object with complete file contents + """ + try: + # Process additional files with complete contents + additionalFileIds = chatMessage.listFileId or [] + additionalFiles = await self.processFileIds(additionalFileIds) + + # Create message object + message = ChatMessage( + id=str(uuid.uuid4()), + workflowId=self.service.workflow.id, + role=role, + agentName=agent if isinstance(agent, str) else agent.get("name", ""), + message=chatMessage.message, + documents=additionalFiles, + status="completed", + startedAt=datetime.now().isoformat() + ) + + return message + + except Exception as e: + logger.error(f"Error creating workflow message: {str(e)}") + raise + + async def sendFinalMessage(self, handover: AgentHandover) -> ChatMessage: + """ + Send final message to user with workflow results. + + Args: + handover: Final handover object + + Returns: + Final message to user + """ + try: + # Create final message content from handover + messageContent = handover.promptFromFinishedAgent + if handover.status == "failed": + messageContent = f"Workflow failed: {handover.error}" + + # Add summary of generated documents + if handover.documentsOutput: + messageContent += "\n\nGenerated documents:" + for doc in handover.documentsOutput: + messageContent += f"\n- {doc.get('name', 'Unknown')}" + + # Create message object + finalMessage = ChatMessage( + id=str(uuid.uuid4()), + workflowId=self.service.workflow.id, + agentName="Workflow Manager", + message=messageContent, + role="assistant", + status="completed", + sequenceNr=0, + startedAt=datetime.now(UTC).isoformat(), + finishedAt=datetime.now(UTC).isoformat(), + success=handover.status == "success", + documents=handover.documentsOutput + ) + + return finalMessage + + except Exception as e: + logger.error(f"Error sending final message: {str(e)}") + return ChatMessage( + id=str(uuid.uuid4()), + workflowId=self.service.workflow.id, + agentName="Workflow Manager", + message=f"Error in workflow: {str(e)}", + role="system", + status="error", + sequenceNr=0, + startedAt=datetime.now(UTC).isoformat(), + finishedAt=datetime.now(UTC).isoformat(), + success=False + ) + + async def workflowSummarize(self, messageUser: ChatMessage) -> str: + """ + Creates a summary of the workflow without the current user message. + + Args: + messageUser: Current user message + + Returns: + Summary of the workflow + """ + if not self.service.workflow or "messages" not in self.service.workflow or not self.service.workflow["messages"]: + return "" # First message + + # Go through messages in chronological order + messages = sorted(self.service.workflow["messages"], key=lambda m: m.get("sequenceNo", 0), reverse=False) + + summaryParts = [] + for message in messages: + if message["id"] != messageUser["id"]: + messageSummary = await self.messageSummarize(message) + summaryParts.append(messageSummary) + + return "\n\n".join(summaryParts) + + async def messageSummarize(self, message: ChatMessage) -> str: + """ + Creates a summary of a message including its documents. + + Args: + message: Message to summarize + + Returns: + Summary of the message + """ + role = message.role + agentName = message.agentName + content = message.content + + try: + # Use the serviceBase for language-aware AI calls + prompt = f"Create a very concise summary (2-3 sentences, maximum 300 characters) of the following message:\n\n{content}" + contentSummary = await self._callAiBasic(prompt) + except Exception as e: + logger.error(f"Error creating summary: {str(e)}") + contentSummary = content[:200] + "..." + + # Summarize documents + docsSummary = "" + if "documents" in message and message["documents"]: + docsList = [] + for i, doc in enumerate(message["documents"]): + docName = self.getFilename(doc) + docsList.append(docName) + if docsList: + docsSummary = "\nDocuments:" + "\n- ".join(docsList) + + return f"[{role} {agentName}]: {contentSummary}{docsSummary}" + + def getFilename(self, document: ChatDocument) -> str: + """ + Gets the filename from a document by combining name and extension. + + Args: + document: Document object + + Returns: + Filename with extension + """ + name = document.name + ext = document.ext + if ext: + return f"{name}.{ext}" + return name + + async def _detectUserLanguage(self, text: str) -> str: + """ + Detects the language of user input using AI. + + Args: + text: User input text to analyze + + Returns: + Language code (e.g., 'en', 'de', 'fr') + """ + try: + # Use basic AI model for language detection + prompt = f""" + Analyze the following text and identify its language. + Return only the ISO 639-1 language code (e.g., 'en' for English, 'de' for German). + + Text: {text} + """ + response = await self._callAiBasic(prompt) + # Clean and validate response + lang_code = response.strip().lower() + # Basic validation of common language codes + valid_codes = {'en', 'de', 'fr', 'es', 'it', 'pt', 'nl', 'ru', 'zh', 'ja', 'ko'} + return lang_code if lang_code in valid_codes else 'en' + except Exception as e: + logger.error(f"Error detecting language: {str(e)}") + return 'en' # Default to English on error + + +# Singleton factory for the chat manager +def getChatManager(): + return ChatManager.getInstance() \ No newline at end of file diff --git a/modules/workflow/dataAccessFunctions.py b/modules/workflow/dataAccessFunctions.py new file mode 100644 index 00000000..87448544 --- /dev/null +++ b/modules/workflow/dataAccessFunctions.py @@ -0,0 +1,273 @@ +""" +Data access functions for Microsoft and Google services. +Provides standardized interfaces for SharePoint, Outlook, and other services. +""" + +from typing import List, Dict, Any, Optional, Union +from datetime import datetime +from pydantic import BaseModel, Field +from enum import Enum + +class ServiceType(str, Enum): + """Service types for data access""" + MSFT = "msft" + GOOGLE = "google" + +class FileRef(BaseModel): + """Reference to a file in storage""" + id: str + name: str + path: str + url: Optional[str] = None + size: Optional[int] = None + lastModified: Optional[datetime] = None + +# SharePoint Functions +class SharePointSearchParams(BaseModel): + """Parameters for SharePoint search""" + userName: str + query: str + site: Optional[str] = None + folder: Optional[str] = None + contentType: Optional[str] = None + createdAfter: Optional[datetime] = None + modifiedAfter: Optional[datetime] = None + maxResults: Optional[int] = 100 + +class SharePointFolderParams(BaseModel): + """Parameters for SharePoint folder operations""" + userName: str + folderPattern: str + site: Optional[str] = None + recursive: bool = False + includeFiles: bool = True + +class SharePointFileParams(BaseModel): + """Parameters for SharePoint file operations""" + userName: str + fileName: str + site: Optional[str] = None + folder: Optional[str] = None + content: Optional[bytes] = None + contentType: Optional[str] = None + +async def Msft_Sharepoint_Search(params: SharePointSearchParams) -> List[Dict[str, Any]]: + """Search SharePoint for files and folders matching criteria""" + # Implementation would go here + pass + +async def Msft_Sharepoint_GetFolders(params: SharePointFolderParams) -> Dict[str, Any]: + """Get SharePoint folders matching pattern""" + # Implementation would go here + pass + +async def Msft_Sharepoint_GetFiles(params: SharePointFileParams) -> Dict[str, Any]: + """Get SharePoint files matching pattern""" + # Implementation would go here + pass + +async def Msft_Sharepoint_GetFile(params: SharePointFileParams) -> Dict[str, Any]: + """Get specific SharePoint file""" + # Implementation would go here + pass + +async def Msft_Sharepoint_PutFile(params: SharePointFileParams) -> FileRef: + """Upload file to SharePoint""" + # Implementation would go here + pass + +# Outlook Mail Functions +class OutlookMailParams(BaseModel): + """Parameters for Outlook mail operations""" + userName: str + folder: Optional[str] = None + messageId: Optional[str] = None + subject: Optional[str] = None + body: Optional[str] = None + to: Optional[List[str]] = None + cc: Optional[List[str]] = None + bcc: Optional[List[str]] = None + attachments: Optional[List[FileRef]] = None + searchString: Optional[str] = None + fromAddress: Optional[str] = None + receivedAfter: Optional[datetime] = None + maxResults: Optional[int] = 100 + +async def Msft_Outlook_ReadMails(params: OutlookMailParams) -> List[Dict[str, Any]]: + """Read multiple emails from Outlook""" + # Implementation would go here + pass + +async def Msft_Outlook_ReadMail(params: OutlookMailParams) -> Dict[str, Any]: + """Read specific email from Outlook""" + # Implementation would go here + pass + +async def Msft_Outlook_DraftMail(params: OutlookMailParams) -> Dict[str, Any]: + """Create draft email in Outlook""" + # Implementation would go here + pass + +async def Msft_Outlook_SendMail(params: OutlookMailParams) -> Dict[str, Any]: + """Send email through Outlook""" + # Implementation would go here + pass + +# Outlook Calendar Functions +class OutlookCalendarParams(BaseModel): + """Parameters for Outlook calendar operations""" + userName: str + calendar: Optional[str] = None + eventId: Optional[str] = None + subject: Optional[str] = None + body: Optional[str] = None + startTime: Optional[datetime] = None + endTime: Optional[datetime] = None + location: Optional[str] = None + organizer: Optional[str] = None + attendees: Optional[List[str]] = None + searchString: Optional[str] = None + maxResults: Optional[int] = 100 + +async def Msft_Outlook_ReadAppointments(params: OutlookCalendarParams) -> List[Dict[str, Any]]: + """Read multiple calendar appointments""" + # Implementation would go here + pass + +async def Msft_Outlook_CreateAppointment(params: OutlookCalendarParams) -> Dict[str, Any]: + """Create new calendar appointment""" + # Implementation would go here + pass + +async def Msft_Outlook_ReadAppointment(params: OutlookCalendarParams) -> Dict[str, Any]: + """Read specific calendar appointment""" + # Implementation would go here + pass + +async def Msft_Outlook_UpdateAppointment(params: OutlookCalendarParams) -> Dict[str, Any]: + """Update existing calendar appointment""" + # Implementation would go here + pass + +async def Msft_Outlook_DeleteAppointment(params: OutlookCalendarParams) -> bool: + """Delete calendar appointment""" + # Implementation would go here + pass + +def get_data_access_functions() -> List[Dict[str, Any]]: + """ + Dynamically generates a comprehensive list of all available data access functions + with their parameters for use in agent prompts. + """ + import inspect + import sys + + functions = [] + current_module = sys.modules[__name__] + + # Get all functions in the module + for name, obj in inspect.getmembers(current_module): + # Check if it's a function and starts with Msft_ or Google_ + if inspect.isfunction(obj) and (name.startswith('Msft_') or name.startswith('Google_')): + # Get function signature + sig = inspect.signature(obj) + + # Get return type annotation + return_type = obj.__annotations__.get('return', 'Any') + if hasattr(return_type, '__origin__'): + return_type = str(return_type) + + # Get parameter model class + param_model = None + for param in sig.parameters.values(): + if param.annotation.__module__ == __name__: + param_model = param.annotation + break + + # Determine authority from function name + authority = ServiceType.MSFT if name.startswith('Msft_') else ServiceType.GOOGLE + + # Create function entry + function_entry = { + "name": name, + "description": obj.__doc__ or "", + "parameters": param_model.schema() if param_model else {}, + "return_type": str(return_type), + "authority": authority + } + + functions.append(function_entry) + + return functions + +class DataAccess: + """Manages data access functions for different services""" + + def __init__(self): + """Initialize the data access manager""" + self.functions = get_data_access_functions() + self._initialize_functions() + + def _initialize_functions(self): + """Initialize function groups and metadata""" + # Group functions by service type + self.msft_functions = {} + self.google_functions = {} + + for func in self.functions: + func_name = func['name'] + # Get the actual function object + func_obj = globals()[func_name] + + if func['authority'] == ServiceType.MSFT: + self.msft_functions[func_name] = func_obj + else: + self.google_functions[func_name] = func_obj + + @property + def msft(self) -> Dict[str, Any]: + """Get Microsoft service functions and metadata""" + return { + 'functions': self.msft_functions, + 'metadata': { + 'name': 'Microsoft Services', + 'description': 'Microsoft Office 365 and SharePoint services', + 'functions': [f for f in self.functions if f['authority'] == ServiceType.MSFT] + } + } + + @property + def google(self) -> Dict[str, Any]: + """Get Google service functions and metadata""" + return { + 'functions': self.google_functions, + 'metadata': { + 'name': 'Google Services', + 'description': 'Google Workspace services', + 'functions': [f for f in self.functions if f['authority'] == ServiceType.GOOGLE] + } + } + + @property + def utils(self) -> Dict[str, Any]: + """Get utility functions for data access""" + return { + 'getAvailableFunctions': lambda: self.functions, + 'getFunctionInfo': lambda name: next((f for f in self.functions if f['name'] == name), None), + 'getServiceFunctions': lambda service_type: [f for f in self.functions if f['authority'] == service_type] + } + + def to_service_object(self) -> Dict[str, Any]: + """Convert to service object format""" + return { + 'msft': self.msft, + 'google': self.google, + 'utils': self.utils + } + +def get_data_access() -> DataAccess: + """Get a singleton instance of the data access manager""" + if not hasattr(get_data_access, '_instance'): + get_data_access._instance = DataAccess() + return get_data_access._instance + diff --git a/modules/workflow/documentManager.py b/modules/workflow/documentManager.py index 2a6dc8c1..6e627c3a 100644 --- a/modules/workflow/documentManager.py +++ b/modules/workflow/documentManager.py @@ -8,6 +8,8 @@ from datetime import datetime from modules.interfaces.serviceChatModel import ChatDocument, ChatContent from modules.workflow.documentProcessor import getDocumentContents import uuid +import json +import base64 logger = logging.getLogger(__name__) @@ -48,7 +50,15 @@ class DocumentManager: return True async def extractContent(self, fileId: str) -> Optional[ChatDocument]: - """Extracts content from a file and creates a chat document.""" + """ + Extract content from a file. + + Args: + fileId: ID of the file to extract content from + + Returns: + ChatDocument object if successful, None otherwise + """ try: # Get file content fileContent = await self.getFileContent(fileId) @@ -59,8 +69,8 @@ class DocumentManager: fileMetadata = await self.getFileMetadata(fileId) if not fileMetadata: return None - - # Create chat document + + # Create ChatDocument return ChatDocument( id=str(uuid.uuid4()), fileId=fileId, @@ -73,27 +83,6 @@ class DocumentManager: logger.error(f"Error extracting content from file {fileId}: {str(e)}") return None - async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: - """ - Process multiple files and extract their contents. - - Args: - fileIds: List of file IDs to process - - Returns: - List of ChatDocument objects - """ - documents = [] - for fileId in fileIds: - try: - document = await self.extractContent(fileId) - if document: - documents.append(document) - except Exception as e: - logger.error(f"Error processing file {fileId}: {str(e)}") - continue - return documents - async def getFileContent(self, fileId: str) -> Optional[bytes]: """Gets the content of a file.""" try: @@ -136,6 +125,272 @@ class DocumentManager: logger.error(f"Error deleting file {fileId}: {str(e)}") return False + async def convertFileRefToId(self, ref: str) -> Optional[int]: + """ + Convert agent file reference to file ID. + + Args: + ref: File reference in format 'filename;id' or just 'id' + + Returns: + File ID if successful, None otherwise + """ + try: + # Extract file ID from reference format + if isinstance(ref, str) and ';' in ref: + return int(ref.split(';')[1]) + return int(ref) + except Exception as e: + logger.error(f"Error converting file reference to ID: {str(e)}") + return None + + async def convertFileIdToRef(self, fileId: str) -> Optional[str]: + """ + Convert file ID to agent file reference. + + Args: + fileId: File ID to convert + + Returns: + File reference in format 'filename;id' if successful, None otherwise + """ + try: + file = await self.getFileMetadata(fileId) + if not file: + return None + return f"{file['name']};{fileId}" + except Exception as e: + logger.error(f"Error converting file ID to reference: {str(e)}") + return None + + async def convertDataFormat(self, data: Any, format: str) -> Any: + """ + Convert data between different formats. + + Args: + data: Data to convert + format: Target format ('json', 'base64', etc.) + + Returns: + Converted data + """ + try: + if format == 'json': + if isinstance(data, str): + return json.loads(data) + return json.dumps(data) + elif format == 'base64': + if isinstance(data, str): + return base64.b64encode(data.encode('utf-8')).decode('utf-8') + return base64.b64encode(data).decode('utf-8') + return data + except Exception as e: + logger.error(f"Error converting data format: {str(e)}") + return data + + async def createAgentInputFileList(self, files: List[str]) -> List[Dict[str, Any]]: + """ + Create a list of input files for agent processing. + + Args: + files: List of file references + + Returns: + List of file objects with content + """ + try: + inputFiles = [] + for file in files: + fileId = await self.convertFileRefToId(file) + if fileId: + fileData = await self.getFileMetadata(fileId) + if fileData: + content = await self.getFileContent(fileId) + inputFiles.append({ + 'id': fileId, + 'name': fileData['name'], + 'mimeType': fileData['mimeType'], + 'content': content + }) + return inputFiles + except Exception as e: + logger.error(f"Error creating agent input file list: {str(e)}") + return [] + + async def saveAgentOutputFiles(self, files: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Save output files from agent processing. + + Args: + files: List of file objects with content + + Returns: + List of saved file metadata + """ + try: + savedFiles = [] + for file in files: + # Create file metadata + fileMeta = await self.saveFile( + filename=file['name'], + content=file['content'], + mimeType=file.get('mimeType', 'application/octet-stream') + ) + + if fileMeta: + savedFiles.append({ + 'id': fileMeta, + 'name': file['name'], + 'mimeType': file.get('mimeType', 'application/octet-stream') + }) + return savedFiles + except Exception as e: + logger.error(f"Error saving agent output files: {str(e)}") + return [] + + async def contentWithPrompt(self, document: Dict[str, Any], prompt: str) -> Optional[Dict[str, Any]]: + """ + Extract content from a document using AI with a specific prompt. + Handles large files by processing in chunks and merging results. + + Args: + document: Document object with file information + prompt: Specific prompt for content extraction + + Returns: + Dictionary with extracted content and metadata + """ + try: + # First get the document content + chat_doc = await self.extractContent(document.get('id')) + if not chat_doc: + return None + + # Prepare the content for AI processing + content = chat_doc.content + mime_type = chat_doc.mimeType + + # For large files, process in chunks + if len(content) > 100000: # Arbitrary threshold, adjust as needed + chunks = self._splitContentIntoChunks(content, mime_type) + extracted_chunks = [] + + for chunk in chunks: + # Process each chunk with AI + chunk_result = await self._processContentChunk(chunk, prompt) + if chunk_result: + extracted_chunks.append(chunk_result) + + # Merge results + return { + "content": self._mergeChunkResults(extracted_chunks), + "metadata": { + "original_size": len(content), + "chunks_processed": len(chunks), + "mime_type": mime_type + } + } + else: + # Process single chunk + result = await self._processContentChunk(content, prompt) + return { + "content": result, + "metadata": { + "original_size": len(content), + "chunks_processed": 1, + "mime_type": mime_type + } + } + + except Exception as e: + logger.error(f"Error in contentWithPrompt: {str(e)}") + return None + + def _splitContentIntoChunks(self, content: str, mime_type: str) -> List[str]: + """ + Split content into manageable chunks based on mime type. + + Args: + content: Content to split + mime_type: MIME type of the content + + Returns: + List of content chunks + """ + try: + if mime_type.startswith('text/'): + # Split text content by paragraphs or sections + return [chunk.strip() for chunk in content.split('\n\n') if chunk.strip()] + elif mime_type == 'application/json': + # Split JSON content by objects + data = json.loads(content) + if isinstance(data, list): + return [json.dumps(item) for item in data] + return [content] + else: + # Default chunking + return [content[i:i+10000] for i in range(0, len(content), 10000)] + except Exception as e: + logger.error(f"Error splitting content: {str(e)}") + return [content] + + async def _processContentChunk(self, chunk: str, prompt: str) -> Optional[str]: + """ + Process a single content chunk with AI. + + Args: + chunk: Content chunk to process + prompt: Extraction prompt + + Returns: + Processed content + """ + try: + # Create AI prompt + ai_prompt = f""" + Extract relevant information from this content based on the following prompt: + + PROMPT: {prompt} + + CONTENT: + {chunk} + + Return ONLY the extracted information in a clear, concise format. + """ + + # Get AI response + response = await self.service.base.callAi([ + {"role": "system", "content": "You are an expert at extracting relevant information from documents."}, + {"role": "user", "content": ai_prompt} + ]) + + return response.strip() + + except Exception as e: + logger.error(f"Error processing content chunk: {str(e)}") + return None + + def _mergeChunkResults(self, chunks: List[str]) -> str: + """ + Merge processed content chunks into a single result. + + Args: + chunks: List of processed chunks + + Returns: + Merged content + """ + try: + # Remove duplicates and empty chunks + chunks = [chunk for chunk in chunks if chunk and chunk.strip()] + + # Merge chunks with appropriate spacing + return "\n\n".join(chunks) + + except Exception as e: + logger.error(f"Error merging chunk results: {str(e)}") + return "" + # Singleton factory for the document manager def getDocumentManager(): return DocumentManager.getInstance() \ No newline at end of file diff --git a/modules/workflow/taskManager.py b/modules/workflow/taskManager.py deleted file mode 100644 index 63a4e08c..00000000 --- a/modules/workflow/taskManager.py +++ /dev/null @@ -1,215 +0,0 @@ -""" -Task Manager Module for managing task states and transitions. -""" - -import logging -from typing import Dict, Any, List, Optional -from datetime import datetime, UTC -import uuid -from modules.interfaces.serviceChatModel import Task, ChatLog, ChatMessage - -logger = logging.getLogger(__name__) - -class TaskManager: - """Manager for task state management and transitions.""" - - _instance = None - - @classmethod - def getInstance(cls): - """Return a singleton instance of the task manager.""" - if cls._instance is None: - cls._instance = cls() - return cls._instance - - def __init__(self): - """Initialize the task manager.""" - if TaskManager._instance is not None: - raise RuntimeError("Singleton instance already exists - use getInstance()") - - self.service = None - - def initialize(self, service=None): - """Initialize or update the manager with service references.""" - if service: - # Validate required interfaces - required_interfaces = ['base', 'msft', 'google'] - missing_interfaces = [] - for interface in required_interfaces: - if not hasattr(service, interface): - missing_interfaces.append(interface) - - if missing_interfaces: - logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}") - return False - - self.service = service - return True - - def createTask(self, workflowId: str, agentName: str, prompt: str, userLanguage: str, - filesInput: List[str] = None, filesOutput: List[str] = None) -> Task: - """ - Create a new task. - - Args: - workflowId: ID of the workflow this task belongs to - agentName: Name of the agent to execute the task - prompt: Task prompt - userLanguage: User's preferred language - filesInput: List of input files - filesOutput: List of output files - - Returns: - New Task object - """ - return Task( - id=str(uuid.uuid4()), - workflowId=workflowId, - agentName=agentName, - status="pending", - progress=0.0, - prompt=prompt, - userLanguage=userLanguage, - filesInput=filesInput or [], - filesOutput=filesOutput or [], - startedAt=datetime.now(UTC).isoformat() - ) - - def updateTaskStatus(self, task: Task, newStatus: str, progress: float = None, - error: str = None, result: ChatMessage = None) -> Task: - """ - Update task status and related fields. - - Args: - task: Task to update - newStatus: New status value - progress: Optional progress value - error: Optional error message - result: Optional result message - - Returns: - Updated Task object - """ - # Validate status transition - valid_transitions = { - "pending": ["running", "failed"], - "running": ["completed", "failed"], - "completed": [], - "failed": [] - } - - if newStatus not in valid_transitions.get(task.status, []): - logger.warning(f"Invalid status transition from {task.status} to {newStatus}") - return task - - # Update task fields - task.status = newStatus - if progress is not None: - task.progress = progress - if error is not None: - task.error = error - if result is not None: - task.result = result - - # Update timestamps - if newStatus in ["completed", "failed"]: - task.finishedAt = datetime.now(UTC).isoformat() - - return task - - def createTaskLog(self, task: Task, message: str, logType: str = "info") -> ChatLog: - """ - Create a log entry for a task. - - Args: - task: Task to create log for - message: Log message - logType: Type of log entry - - Returns: - New ChatLog object - """ - return ChatLog( - id=str(uuid.uuid4()), - workflowId=task.workflowId, - message=message, - type=logType, - timestamp=datetime.now(UTC).isoformat(), - agentName=task.agentName, - status=task.status, - progress=task.progress - ) - - def updateTaskProgress(self, task: Task, progress: float, message: str = None) -> Task: - """ - Update task progress and optionally create a log entry. - - Args: - task: Task to update - progress: New progress value (0-100) - message: Optional progress message - - Returns: - Updated Task object - """ - # Validate progress value - if not 0 <= progress <= 100: - logger.warning(f"Invalid progress value: {progress}") - return task - - # Update progress - task.progress = progress - - # Create log entry if message provided - if message: - log = self.createTaskLog(task, message, "progress") - if self.service and hasattr(self.service, 'logAdd'): - self.service.logAdd(log) - - return task - - def handleTaskError(self, task: Task, error: str) -> Task: - """ - Handle task error and update task state. - - Args: - task: Task to update - error: Error message - - Returns: - Updated Task object - """ - # Update task status - task = self.updateTaskStatus(task, "failed", error=error) - - # Create error log - log = self.createTaskLog(task, f"Task failed: {error}", "error") - if self.service and hasattr(self.service, 'logAdd'): - self.service.logAdd(log) - - return task - - def completeTask(self, task: Task, result: ChatMessage) -> Task: - """ - Mark task as completed and set result. - - Args: - task: Task to complete - result: Result message - - Returns: - Updated Task object - """ - # Update task status - task = self.updateTaskStatus(task, "completed", progress=100.0, result=result) - - # Create completion log - log = self.createTaskLog(task, "Task completed successfully", "info") - if self.service and hasattr(self.service, 'logAdd'): - self.service.logAdd(log) - - return task - -# Singleton factory for the task manager -def getTaskManager(): - return TaskManager.getInstance() \ No newline at end of file diff --git a/modules/workflow/workflowManager.py b/modules/workflow/workflowManager.py index 98df49fb..a27589a6 100644 --- a/modules/workflow/workflowManager.py +++ b/modules/workflow/workflowManager.py @@ -3,230 +3,38 @@ Workflow Manager Module for state machine-based backend chat workflow. Implements the state machine as defined in the documentation. """ -import asyncio -import os import logging -import json -import uuid -import base64 from datetime import datetime, UTC, timedelta -from typing import Dict, Any, List, Optional, Union, Tuple, Callable, TypedDict, Protocol import time -from modules.shared.mimeUtils import isTextMimeType -# Required imports -from modules.workflow.agentManager import getAgentManager -from modules.workflow.taskManager import getTaskManager -from modules.workflow.documentManager import getDocumentManager -from modules.interfaces.serviceChatModel import ( - UserInputRequest, ChatWorkflow, ChatMessage, ChatLog, - ChatDocument, ChatStat, Task, AgentResponse, AgentProfile -) +from modules.workflow.chatManager import getChatManager +from modules.interfaces.serviceChatModel import ( UserInputRequest, ChatWorkflow ) # Configure logger logger = logging.getLogger(__name__) -# Global settings for the workflow management -GLOBAL_WORKFLOW_LABELS = { - "systemName": "AI Assistant", # Default system name for logs - "workflowStatusMessages": { - "init": "Workflow initialized", - "running": "Running workflow", - "waiting": "Waiting for input", - "completed": "Workflow completed successfully", - "stopped": "Workflow stopped by user", - "failed": "Error in workflow" - } -} - class WorkflowStoppedException(Exception): """Exception raised when a workflow is forcibly stopped with function checkExitCriteria() """ pass -class ServiceObject: - """Service object structure available to agents.""" - def __init__(self): - self.user: Dict[str, Any] = {} # User context - self.operator: Dict[str, Callable] = {} # Document operations - self.workflow: Dict[str, Any] = {} # Workflow context - self.functions: Any = None # Core functions - self.logAdd: Callable = None # Logging function - class WorkflowManager: """Manages the execution of workflows and their associated agents.""" - - def __init__(self, service: ServiceObject): - """Initialize the workflow manager with service container.""" - # Store service container - self.service = service - self.service.logAdd = self.logAdd - - # Initialize managers - self.agentManager = getAgentManager() - self.taskManager = getTaskManager() - self.documentManager = getDocumentManager() - - # Initialize managers with service - self.agentManager.initialize(service=self.service) - self.documentManager.initialize(service=self.service) - # Add agent service functionality directly to service object - service.user = { - 'attributes': service.user.get('attributes', {}), - 'connection': service.user.get('connection', []) - } - - # Add operator functions - service.operator = { - 'forEach': lambda items, func: [func(item) for item in items], - 'aiCall': service.functions.callAi, - 'extract': lambda file: self.documentManager.extractContent(file), - 'fileRefToFileId': lambda ref: self.documentManager.convertFileRefToId(ref), - 'fileIdToFileRef': lambda fileId: self.documentManager.convertFileIdToRef(fileId), - 'convert': lambda data, format: self.documentManager.convertDataFormat(data, format), - 'createAgentInputFiles': lambda files: self.documentManager.createAgentInputFileList(files), - 'saveAgentOutputFiles': lambda files: self.documentManager.saveAgentOutputFiles(files) - } - - # Add workflow context - service.workflow = { - 'activeTask': { - 'id': None, - 'progress': 0, - 'status': 'pending' - }, - 'tasks': [] - } + def __init__(self): + """Initialize the workflow manager.""" + self.chatManager = getChatManager() - def _extractFileContent(self, file): - """Extract content from a file for agent processing.""" - try: - fileData = self.service.functions.getFileData(file['id']) - if fileData is None: - return None - - # Handle base64 encoded content - if file.get('base64Encoded', False): - import base64 - return base64.b64decode(fileData) - - # Handle text content - if isinstance(fileData, bytes): - return fileData.decode('utf-8') - return fileData - - except Exception as e: - logger.error(f"Error extracting file content: {str(e)}") - return None - - def _convertFileRefToId(self, ref): - """Convert agent file reference to file ID.""" - try: - # Extract file ID from reference format - if isinstance(ref, str) and ';' in ref: - return int(ref.split(';')[1]) - return int(ref) - except Exception as e: - logger.error(f"Error converting file reference to ID: {str(e)}") - return None - - def _convertFileIdToRef(self, fileId): - """Convert file ID to agent file reference.""" - try: - file = self.service.functions.getFile(fileId) - if not file: - return None - return f"{file['name']};{fileId}" - except Exception as e: - logger.error(f"Error converting file ID to reference: {str(e)}") - return None - - def _convertDataFormat(self, data, format): - """Convert data between different formats.""" - try: - if format == 'json': - if isinstance(data, str): - return json.loads(data) - return json.dumps(data) - elif format == 'base64': - import base64 - if isinstance(data, str): - return base64.b64encode(data.encode('utf-8')).decode('utf-8') - return base64.b64encode(data).decode('utf-8') - return data - except Exception as e: - logger.error(f"Error converting data format: {str(e)}") - return data - - def _createAgentInputFileList(self, files): - """Create a list of input files for agent processing.""" - try: - inputFiles = [] - for file in files: - fileId = self._convertFileRefToId(file) - if fileId: - fileData = self.service.functions.getFile(fileId) - if fileData: - inputFiles.append({ - 'id': fileId, - 'name': fileData['name'], - 'mimeType': fileData['mimeType'], - 'content': self._extractFileContent(fileData) - }) - return inputFiles - except Exception as e: - logger.error(f"Error creating agent input file list: {str(e)}") - return [] - - def _saveAgentOutputFiles(self, files): - """Save output files from agent processing.""" - try: - savedFiles = [] - for file in files: - # Create file metadata - fileMeta = self.service.functions.createFile( - name=file['name'], - mimeType=file.get('mimeType', 'application/octet-stream'), - size=len(file['content']) - ) - - if fileMeta and 'id' in fileMeta: - # Save file content - if self.service.functions.createFileData(fileMeta['id'], file['content']): - savedFiles.append({ - 'id': fileMeta['id'], - 'name': file['name'], - 'mimeType': file.get('mimeType', 'application/octet-stream') - }) - return savedFiles - except Exception as e: - logger.error(f"Error saving agent output files: {str(e)}") - return [] - - async def workflowStart(self, userInput: UserInputRequest, workflowId: Optional[str] = None) -> ChatWorkflow: - """Starts a new workflow or continues an existing one.""" - # 1. Initialize workflow or load existing one - workflow = self.workflowInit(workflowId) - self.logAdd(workflow, "Starting workflow processing", level="info", progress=0) - - # Start asynchronous processing - asyncio.create_task(self.workflowProcess(userInput, workflow)) - - return workflow - - def checkExitCriteria(self, workflow: ChatWorkflow) -> None: + def initialize(self, workflow: ChatWorkflow): """ - Check if the workflow should exit based on the current state. - Raises WorkflowStoppedException if workflow should stop. + Initialize the workflow manager with a workflow object. Args: - workflow: ChatWorkflow object to check + workflow: ChatWorkflow object to initialize with """ - current_workflow = self.service.functions.loadWorkflowState(workflow.id) - if current_workflow["status"] in ["stopped", "failed"]: - self.logAdd(workflow, f"Workflow processing terminated due to status: {current_workflow['status']}", level="info") - # Raise an exception to stop execution - raise WorkflowStoppedException(f"Workflow execution stopped due to status: {current_workflow['status']}") + self.chatManager.initialize(workflow) + return True + + # Main function to start workflow process async def workflowProcess(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatWorkflow: """ @@ -242,132 +50,53 @@ class WorkflowManager: """ startTime = time.time() try: - # State 3: User Message Processing + # Initialize workflow + self.initialize(workflow) + + # Check if workflow should exit self.checkExitCriteria(workflow) - messageUser = await self.chatMessageToWorkflow("user", None, { - "prompt": userInput.prompt, - "listFileId": userInput.listFileId - }, workflow) - messageUser.status = "first" # For first message - # State 4: Project Manager Analysis - self.checkExitCriteria(workflow) - self.logAdd(workflow, "Analyzing request and planning work", level="info", progress=10) - projectManagerResponse = await self.projectManagerAnalysis(messageUser, workflow) - objFinalDocuments = projectManagerResponse.get("objFinalDocuments", []) - objWorkplan = projectManagerResponse.get("objWorkplan", []) - objUserResponse = projectManagerResponse.get("objUserResponse", "") + # Create initial handover + handover = self.chatManager.createInitialHandover(userInput) - # Get detected language and set it in the serviceBase interface - self.checkExitCriteria(workflow) - userLanguage = projectManagerResponse.get("userLanguage", "en") - workflow.userLanguage = userLanguage - self.service.functions.setUserLanguage(userLanguage) - - # Save the response as a message in the workflow and add log entries - self.checkExitCriteria(workflow) - responseMessage = ChatMessage( - id=str(uuid.uuid4()), - workflowId=workflow.id, - agentName="Project Manager", - message=objUserResponse, - role="assistant", - status="step", - sequenceNr=len(workflow.messages) + 1, - startedAt=datetime.now(UTC).isoformat(), - finishedAt=datetime.now(UTC).isoformat(), - success=True - ) - workflow.messages.append(responseMessage) - - # Add detailed log entry about the task plan - taskPlanLog = "Input: " - if objFinalDocuments: - taskPlanLog += ", ".join(objFinalDocuments) + "