""" Data analyst agent for analysis and interpretation of data. Focuses on output-first design with AI-powered analysis. """ import logging import json import io import base64 from typing import Dict, Any, List import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from modules.workflowAgentsRegistry import AgentBase logger = logging.getLogger(__name__) class AgentAnalyst(AgentBase): """AI-driven agent for data analysis and visualization""" def __init__(self): """Initialize the data analysis agent""" super().__init__() self.name = "analyst" self.label = "Data Analysis" self.description = "Analyzes data using AI-powered insights and visualizations, produce diagrams and visualizations" self.capabilities = [ "dataAnalysis", "statistics", "visualization", "dataInterpretation", "reportGeneration" ] # Set default visualization settings plt.style.use('seaborn-v0_8-whitegrid') def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" self.mydom = mydom async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Process a task by focusing on required outputs and using AI to generate them. Args: task: Task dictionary with prompt, inputDocuments, outputSpecifications Returns: Dictionary with feedback and documents """ try: # Extract task information prompt = task.get("prompt", "") inputDocuments = task.get("inputDocuments", []) outputSpecs = task.get("outputSpecifications", []) # Check AI service if not self.mydom: return { "feedback": "The Analyst agent requires an AI service to function.", "documents": [] } # Extract data from documents - focusing only on dataExtracted datasets, documentContext = self._extractData(inputDocuments) # Generate task analysis to understand what's needed analysisPlan = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs) # Generate all required output documents documents = [] # If no output specs provided, create default analysis outputs if not outputSpecs: outputSpecs = [] # Process each output specification for spec in outputSpecs: outputLabel = spec.get("label", "") outputDescription = spec.get("description", "") # Determine type based on file extension outputType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt" # Generate appropriate content based on output type if outputType in ['png', 'jpg', 'jpeg', 'svg']: # Create visualization document = await self._createVisualization( datasets, prompt, outputLabel, analysisPlan, outputDescription ) documents.append(document) elif outputType in ['csv', 'json', 'xlsx']: # Create data document document = await self._createDataDocument( datasets, prompt, outputLabel, analysisPlan, outputDescription ) documents.append(document) else: # Create text document (report, analysis, etc.) document = await self._createTextDocument( datasets, documentContext, prompt, outputLabel, outputType, analysisPlan, outputDescription ) documents.append(document) # Generate feedback feedback = f"{analysisPlan.get('analysisApproach')}" if analysisPlan.get("keyInsights"): feedback += f"\n\n{analysisPlan.get('keyInsights')}" return { "feedback": feedback, "documents": documents } except Exception as e: logger.error(f"Error in analysis: {str(e)}", exc_info=True) return { "feedback": f"Error during analysis: {str(e)}", "documents": [] } def _extractData(self, documents: List[Dict[str, Any]]) -> tuple: """ Extract data from documents, focusing on dataExtracted fields. Args: documents: List of input documents Returns: Tuple of (datasets dictionary, document context text) """ datasets = {} documentContext = "" # Process each document for doc in documents: docName = doc.get("name", "unnamed") if doc.get("ext"): docName = f"{docName}.{doc.get('ext')}" documentContext += f"\n\n--- {docName} ---\n" # Process contents for content in doc.get("contents", []): # Focus only on dataExtracted if content.get("dataExtracted"): extractedText = content.get("dataExtracted", "") documentContext += extractedText # Try to parse as structured data if appropriate if docName.lower().endswith(('.csv', '.tsv')): try: df = pd.read_csv(io.StringIO(extractedText)) datasets[docName] = df except: pass elif docName.lower().endswith('.json'): try: jsonData = json.loads(extractedText) if isinstance(jsonData, list): df = pd.DataFrame(jsonData) datasets[docName] = df elif isinstance(jsonData, dict): # Handle nested JSON structures if any(isinstance(v, list) for v in jsonData.values()): for key, value in jsonData.items(): if isinstance(value, list) and len(value) > 0: df = pd.DataFrame(value) datasets[f"{docName}:{key}"] = df else: df = pd.DataFrame([jsonData]) datasets[docName] = df except: pass # Try to detect tabular data in text content if docName not in datasets and len(extractedText.splitlines()) > 2: lines = extractedText.splitlines() if any(',' in line for line in lines[:5]): try: df = pd.read_csv(io.StringIO(extractedText)) if len(df.columns) > 1: datasets[docName] = df except: pass elif any('\t' in line for line in lines[:5]): try: df = pd.read_csv(io.StringIO(extractedText), sep='\t') if len(df.columns) > 1: datasets[docName] = df except: pass return datasets, documentContext async def _analyzeTask(self, prompt: str, context: str, datasets: Dict, outputSpecs: List) -> Dict: """ Use AI to analyze the task and create a plan for analysis. Args: prompt: The task prompt context: Document context text datasets: Dictionary of extracted datasets outputSpecs: Output specifications Returns: Analysis plan dictionary """ # Prepare dataset information datasetInfo = {} for name, df in datasets.items(): try: datasetInfo[name] = { "shape": df.shape, "columns": df.columns.tolist(), "dtypes": {col: str(df[col].dtype) for col in df.columns}, "sample": df.head(3).to_dict(orient='records') } except: datasetInfo[name] = {"error": "Could not process dataset"} analysisPrompt = f""" Analyze this data analysis task and create a plan. TASK: {prompt} AVAILABLE DATA: {json.dumps(datasetInfo, indent=2)} DOCUMENT CONTEXT: {context[:1000]}... (truncated) OUTPUT REQUIREMENTS: {json.dumps(outputSpecs, indent=2)} Create a detailed analysis plan in JSON format with the following structure: {{ "analysisType": "statistical|trend|comparative|predictive|cluster|general", "keyQuestions": ["question1", "question2"], "recommendedVisualizations": [{{ "type": "chart_type", "dataSource": "dataset_name", "variables": ["col1", "col2"], "purpose": "explanation" }}], "keyInsights": "brief summary of initial insights", "analysisApproach": "brief description of recommended approach" }} Only return valid JSON. No preamble or explanations. """ try: response = await self.mydom.callAi([ {"role": "system", "content": "You are a data analysis expert. Respond with valid JSON only."}, {"role": "user", "content": analysisPrompt} ], produceUserAnswer = True) # Extract JSON from response jsonStart = response.find('{') jsonEnd = response.rfind('}') + 1 if jsonStart >= 0 and jsonEnd > jsonStart: plan = json.loads(response[jsonStart:jsonEnd]) return plan else: # Fallback if JSON not found return { "analysisType": "general", "keyQuestions": ["What insights can be extracted from this data?"], "recommendedVisualizations": [], "keyInsights": "Analysis plan could not be created", "analysisApproach": "General exploratory analysis" } except Exception as e: logger.warning(f"Error creating analysis plan: {str(e)}") return { "analysisType": "general", "keyQuestions": ["What insights can be extracted from this data?"], "recommendedVisualizations": [], "keyInsights": "Analysis plan could not be created", "analysisApproach": "General exploratory analysis" } async def _createVisualization(self, datasets: Dict, prompt: str, outputLabel: str, analysisPlan: Dict, description: str) -> Dict: """ Create visualization document using AI guidance. Args: datasets: Dictionary of datasets prompt: Original task prompt outputLabel: Output filename analysisPlan: Analysis plan from AI description: Output description Returns: Visualization document """ # Determine format from filename formatType = outputLabel.split('.')[-1].lower() if formatType not in ['png', 'jpg', 'jpeg', 'svg']: formatType = 'png' # If no datasets available, create error message image if not datasets: plt.figure(figsize=(10, 6)) plt.text(0.5, 0.5, "No data available for visualization", ha='center', va='center', fontsize=14) plt.tight_layout() imgData = self._getImageBase64(formatType) plt.close() return { "label": outputLabel, "content": imgData, "metadata": { "contentType": f"image/{formatType}" } } # Get recommended visualization from plan recommendedViz = analysisPlan.get("recommendedVisualizations", []) # Prepare dataset info for the first dataset if none specified if not recommendedViz and datasets: name, df = next(iter(datasets.items())) recommendedViz = [{ "type": "auto", "dataSource": name, "variables": df.columns.tolist()[:5], "purpose": "general analysis" }] # Create visualization code prompt vizPrompt = f""" Generate Python matplotlib/seaborn code to create a visualization for: TASK: {prompt} VISUALIZATION REQUIREMENTS: - Output format: {formatType} - Filename: {outputLabel} - Description: {description} RECOMMENDED VISUALIZATION: {json.dumps(recommendedViz, indent=2)} AVAILABLE DATASETS: """ # Add dataset info for recommended sources for viz in recommendedViz: dataSource = viz.get("dataSource") if dataSource in datasets: df = datasets[dataSource] vizPrompt += f"\nDataset '{dataSource}':\n" vizPrompt += f"- Shape: {df.shape}\n" vizPrompt += f"- Columns: {df.columns.tolist()}\n" vizPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n" vizPrompt += """ Generate ONLY Python code that: 1. Uses matplotlib and/or seaborn to create a clear visualization 2. Sets figure size to (10, 6) 3. Includes appropriate titles, labels, and legend 4. Uses professional color schemes 5. Handles any missing data gracefully Return ONLY executable Python code, no explanations or markdown. """ try: # Get visualization code from AI vizCode = await self.mydom.callAi([ {"role": "system", "content": "You are a data visualization expert. Provide only executable Python code."}, {"role": "user", "content": vizPrompt} ], produceUserAnswer = True) # Clean code vizCode = vizCode.replace("```python", "").replace("```", "").strip() # Execute visualization code plt.figure(figsize=(10, 6)) # Make local variables available to the code localVars = { "plt": plt, "sns": sns, "pd": pd, "np": __import__('numpy') } # Add datasets to local variables for name, df in datasets.items(): # Create a sanitized variable name varName = ''.join(c if c.isalnum() else '_' for c in name) localVars[varName] = df # Also add with standard names for simpler code if "df" not in localVars: localVars["df"] = df elif "df2" not in localVars: localVars["df2"] = df # Execute the visualization code exec(vizCode, globals(), localVars) # Capture the image imgData = self._getImageBase64(formatType) plt.close() return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") except Exception as e: logger.error(f"Error creating visualization: {str(e)}", exc_info=True) # Create error message image plt.figure(figsize=(10, 6)) plt.text(0.5, 0.5, f"Visualization error: {str(e)}", ha='center', va='center', fontsize=12) plt.tight_layout() imgData = self._getImageBase64(formatType) plt.close() return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") async def _createDataDocument(self, datasets: Dict, prompt: str, outputLabel: str, analysisPlan: Dict, description: str) -> Dict: """ Create a data document (e.g., CSV, JSON) based on analysis. Args: datasets: Dictionary of datasets prompt: Original task prompt outputLabel: Output filename analysisPlan: Analysis plan from AI description: Output description Returns: Data document """ # Determine format from filename formatType = outputLabel.split('.')[-1].lower() # If no datasets available, return error message if not datasets: return { "label": outputLabel, "content": f"No data available for processing into {formatType} format.", "metadata": { "contentType": "text/plain" } } # Generate data processing instructions dataPrompt = f""" Create Python code to process datasets and generate a {formatType} file for: TASK: {prompt} OUTPUT REQUIREMENTS: - Format: {formatType} - Filename: {outputLabel} - Description: {description} ANALYSIS CONTEXT: {json.dumps(analysisPlan, indent=2)} AVAILABLE DATASETS: """ # Add dataset info for name, df in datasets.items(): dataPrompt += f"\nDataset '{name}':\n" dataPrompt += f"- Shape: {df.shape}\n" dataPrompt += f"- Columns: {df.columns.tolist()}\n" dataPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n" dataPrompt += """ Generate Python code that: 1. Processes the available dataset(s) 2. Performs necessary transformations, aggregations, or calculations 3. Outputs the result in the requested format 4. Returns the content as a string variable named 'result' Return ONLY executable Python code, no explanations or markdown. """ try: # Get data processing code from AI dataCode = await self.mydom.callAi([ {"role": "system", "content": "You are a data processing expert. Provide only executable Python code."}, {"role": "user", "content": dataPrompt} ], produceUserAnswer = True) # Clean code dataCode = dataCode.replace("```python", "").replace("```", "").strip() # Setup execution environment localVars = {"pd": pd, "np": __import__('numpy'), "io": io} # Add datasets to local variables for name, df in datasets.items(): # Create a sanitized variable name varName = ''.join(c if c.isalnum() else '_' for c in name) localVars[varName] = df # Also add with standard names for simpler code if "df" not in localVars: localVars["df"] = df elif "df2" not in localVars: localVars["df2"] = df # Execute the code exec(dataCode, globals(), localVars) # Get the result result = localVars.get("result", "No output was generated.") # Determine content type contentType = "text/csv" if formatType == "csv" else \ "application/json" if formatType == "json" else \ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" if formatType == "xlsx" else \ "text/plain" return self.formatAgentDocumentOutput(outputLabel, result, contentType) except Exception as e: logger.error(f"Error creating data document: {str(e)}", exc_info=True) return { "label": outputLabel, "content": f"Error generating {formatType} document: {str(e)}", "metadata": { "contentType": "text/plain" } } async def _createTextDocument(self, datasets: Dict, context: str, prompt: str, outputLabel: str, formatType: str, analysisPlan: Dict, description: str) -> Dict: """ Create a text document (report, analysis, etc.) based on analysis. Args: datasets: Dictionary of datasets context: Document context text prompt: Original task prompt outputLabel: Output filename formatType: Output format type analysisPlan: Analysis plan from AI description: Output description Returns: Text document """ # Create dataset summaries datasetSummaries = [] for name, df in datasets.items(): summary = f"Dataset: {name}\n" summary += f"- Shape: {df.shape[0]} rows, {df.shape[1]} columns\n" summary += f"- Columns: {', '.join(df.columns.tolist())}\n" # Basic statistics for numeric columns numericCols = df.select_dtypes(include=['number']).columns if len(numericCols) > 0: summary += "- Numeric Columns Stats:\n" for col in numericCols[:3]: # Limit to first 3 stats = df[col].describe() summary += f" - {col}: min={stats['min']:.2f}, max={stats['max']:.2f}, mean={stats['mean']:.2f}\n" datasetSummaries.append(summary) # Determine content type based on format contentType = "text/markdown" if formatType in ["md", "markdown"] else \ "text/html" if formatType == "html" else \ "text/plain" # Generate analysis prompt analysisPrompt = f""" Create a detailed {formatType} document for: TASK: {prompt} OUTPUT REQUIREMENTS: - Format: {formatType} - Filename: {outputLabel} - Description: {description} ANALYSIS CONTEXT: {json.dumps(analysisPlan, indent=2)} DATASET SUMMARIES: {"".join(datasetSummaries)} DOCUMENT CONTEXT: {context[:2000]}... (truncated) Create a comprehensive, professional analysis document that addresses the task requirements. The document should: 1. Have a clear structure with headings and sections 2. Include relevant data findings and insights 3. Provide appropriate interpretations and recommendations 4. Format the content according to the required output format Your response should be the complete document content in the specified format. """ try: # Get document content from AI documentContent = await self.mydom.callAi([ {"role": "system", "content": f"You are a data analysis expert creating a {formatType} document."}, {"role": "user", "content": analysisPrompt} ], produceUserAnswer = True) # Clean HTML or Markdown if needed if formatType in ["md", "markdown"] and not documentContent.strip().startswith("#"): documentContent = f"# Analysis Report\n\n{documentContent}" elif formatType == "html" and not "
{documentContent}" return self.formatAgentDocumentOutput(outputLabel, documentContent, contentType) except Exception as e: logger.error(f"Error creating text document: {str(e)}", exc_info=True) # Create a simple error document if formatType in ["md", "markdown"]: content = f"# Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" elif formatType == "html": content = f"There was an error generating the analysis: {str(e)}
" else: content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" return { "label": outputLabel, "content": content, "metadata": { "contentType": contentType } } def _getImageBase64(self, formatType: str = 'png') -> str: """ Convert current matplotlib figure to base64 string. Args: formatType: Image format Returns: Base64 encoded string of the image """ buffer = io.BytesIO() plt.savefig(buffer, format=formatType, dpi=100) buffer.seek(0) imageData = buffer.getvalue() buffer.close() # Convert to base64 return base64.b64encode(imageData).decode('utf-8') # Factory function for the Analyst agent def getAgentAnalyst(): """Returns an instance of the Analyst agent.""" return AgentAnalyst()