""" Data analyst agent for analysis and interpretation of data. Focuses on output-first design with AI-powered analysis. """ import logging import json import io import base64 import os import time from typing import Dict, Any, List, Optional import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from datetime import datetime, UTC import hashlib import uuid import re import shutil from pathlib import Path import traceback import sys import importlib.util import inspect from pydantic import BaseModel from modules.workflow.agentBase import AgentBase from modules.interfaces.serviceChatModel import ( ChatContent, ChatMessage, ChatStat, AgentResponse, AgentHandover ) logger = logging.getLogger(__name__) class AgentAnalyst(AgentBase): """AI-driven agent for data analysis and visualization""" def __init__(self): """Initialize the data analysis agent""" super().__init__() self.name = "analyst" self.label = "Data Analysis" self.description = "Analyzes data using AI-powered insights and visualizations, produce diagrams and visualizations" self.capabilities = [ "dataAnalysis", "statistics", "visualization", "dataInterpretation", "reportGeneration" ] # Set default visualization settings plt.style.use('seaborn-v0_8-whitegrid') def setDependencies(self, serviceBase=None): """Set external dependencies for the agent.""" self.setService(serviceBase) async def processTask(self, handover: AgentHandover) -> AgentResponse: """ Process a task by focusing on required outputs and using AI to guide the analysis process. Args: handover: AgentHandover object containing task information Returns: AgentResponse object with execution results """ try: # 1. Initial Analysis & Planning self.service.logAdd(handover.workflowId, "Starting analysis task...", level="info", progress=10) # Generate extraction prompts for each file extraction_prompts = await self._generateExtractionPrompts( prompt=handover.promptUserInitial, documents=handover.documentsUserInitial ) # 2. Parallel Content Extraction with specific prompts self.service.logAdd(handover.workflowId, "Extracting content from documents...", level="info", progress=20) extracted_contents = [] for doc, extraction_prompt in zip(handover.documentsUserInitial, extraction_prompts): # Use document service for extraction with specific prompt content_result = await self.service.document.contentWithPrompt(doc, extraction_prompt) if content_result: extracted_contents.append({ "document": doc, "content": content_result["content"], "metadata": content_result["metadata"], "extraction_prompt": extraction_prompt }) # 3. Analysis & Reflection self.service.logAdd(handover.workflowId, "Analyzing extracted content...", level="info", progress=50) analysis_results = await self._analyzeContent( prompt=handover.promptUserInitial, extracted_contents=extracted_contents ) # 4. Response Generation & Handover Update self.service.logAdd(handover.workflowId, "Generating response...", level="info", progress=80) # Create ChatMessage with results response_message = ChatMessage( id=str(uuid.uuid4()), workflowId=handover.workflowId, agentName=self.name, message=analysis_results.get("feedback", ""), role="assistant", status="completed", sequenceNr=handover.sequenceNr, startedAt=handover.startedAt, finishedAt=datetime.now(UTC).isoformat(), success=True, documents=analysis_results.get("documents", []), stats=ChatStat( processingTime=analysis_results.get("processing_time"), tokenCount=analysis_results.get("token_count"), successRate=1.0 ) ) # Update handover object handover.status = "success" handover.progress = 100.0 handover.finishedAt = datetime.now(UTC).isoformat() handover.documentsOutput = analysis_results.get("documents", []) handover.promptFromFinishedAgent = analysis_results.get("feedback", "") return AgentResponse( success=True, message=response_message, performance=analysis_results.get("performance", {}), progress=100.0 ) except Exception as e: logger.error(f"Error in analysis task: {str(e)}", exc_info=True) # Create error response error_message = ChatMessage( id=str(uuid.uuid4()), workflowId=handover.workflowId, agentName=self.name, message=f"Error during analysis: {str(e)}", role="system", status="error", sequenceNr=handover.sequenceNr, startedAt=handover.startedAt, finishedAt=datetime.now(UTC).isoformat(), success=False ) # Update handover with error handover.status = "failed" handover.error = str(e) handover.finishedAt = datetime.now(UTC).isoformat() return AgentResponse( success=False, message=error_message, performance={}, progress=0.0 ) async def _generateExtractionPrompts(self, prompt: str, documents: List[Dict[str, Any]]) -> List[str]: """ Generate specific extraction prompts for each document. Args: prompt: The original user prompt documents: List of documents to process Returns: List of extraction prompts, one for each document """ try: # Create prompt for AI to generate extraction prompts prompt_generation = f""" Generate specific extraction prompts for each document based on the user's request. USER REQUEST: {prompt} DOCUMENTS: {json.dumps([{ "name": doc.get("name", ""), "type": doc.get("type", ""), "size": doc.get("size", 0) } for doc in documents], indent=2)} For each document, generate a specific extraction prompt that will help extract the most relevant information. Consider: 1. The document type and format 2. The user's original request 3. What specific information would be most useful Return a JSON array of prompts, one for each document: [ {{ "document_name": "name of the document", "extraction_prompt": "specific prompt for this document" }} ] """ # Get AI's response response = await self.service.base.callAi([ {"role": "system", "content": "You are an expert at creating precise document extraction prompts."}, {"role": "user", "content": prompt_generation} ]) # Parse response prompts_data = json.loads(response) # Map prompts to documents extraction_prompts = [] for doc in documents: doc_prompt = next( (p["extraction_prompt"] for p in prompts_data if p["document_name"] == doc.get("name")), f"Extract all relevant information from {doc.get('name')} that relates to: {prompt}" ) extraction_prompts.append(doc_prompt) return extraction_prompts except Exception as e: logger.error(f"Error generating extraction prompts: {str(e)}") # Fallback to generic prompts return [f"Extract all relevant information from {doc.get('name')} that relates to: {prompt}" for doc in documents] async def _analyzeContent(self, prompt: str, extracted_contents: List[Dict[str, Any]]) -> Dict[str, Any]: """ Analyze the extracted content and generate results. Args: prompt: The original user prompt extracted_contents: List of extracted content with metadata Returns: Dictionary containing analysis results """ try: # Create analysis prompt analysis_prompt = f""" Analyze the following extracted content and provide insights based on the user's request. USER REQUEST: {prompt} EXTRACTED CONTENT: {json.dumps([{ "document": content["document"].get("name", ""), "content": content["content"], "extraction_prompt": content["extraction_prompt"] } for content in extracted_contents], indent=2)} Provide a comprehensive analysis that: 1. Synthesizes information from all documents 2. Identifies key insights and patterns 3. Relates findings to the user's request 4. Suggests potential visualizations or additional analysis Format your response as a JSON object with: {{ "insights": ["list of key insights"], "patterns": ["list of identified patterns"], "recommendations": ["list of recommendations"], "visualizations": ["list of suggested visualizations"], "feedback": "summary of findings" }} """ # Get AI's analysis response = await self.service.base.callAi([ {"role": "system", "content": "You are an expert data analyst."}, {"role": "user", "content": analysis_prompt} ]) # Parse and return results return json.loads(response) except Exception as e: logger.error(f"Error analyzing content: {str(e)}") return { "insights": [], "patterns": [], "recommendations": [], "visualizations": [], "feedback": f"Error during analysis: {str(e)}" } def _extractData(self, documents: List[Dict[str, Any]]) -> tuple: """ Extract data from documents, focusing on dataExtracted fields. Args: documents: List of input documents Returns: Tuple of (datasets dictionary, document context text) """ datasets = {} documentContext = "" # Process each document for doc in documents: docName = doc.get("name", "unnamed") if doc.get("ext"): docName = f"{docName}.{doc.get('ext')}" documentContext += f"\n\n--- {docName} ---\n" # Process contents for content in doc.get("contents", []): # Focus only on dataExtracted if content.get("dataExtracted"): extractedText = content.get("dataExtracted", "") documentContext += extractedText # Try to parse as structured data if appropriate if docName.lower().endswith(('.csv', '.tsv')): try: df = pd.read_csv(io.StringIO(extractedText)) datasets[docName] = df except: pass elif docName.lower().endswith('.json'): try: jsonData = json.loads(extractedText) if isinstance(jsonData, list): df = pd.DataFrame(jsonData) datasets[docName] = df elif isinstance(jsonData, dict): # Handle nested JSON structures if any(isinstance(v, list) for v in jsonData.values()): for key, value in jsonData.items(): if isinstance(value, list) and len(value) > 0: df = pd.DataFrame(value) datasets[f"{docName}:{key}"] = df else: df = pd.DataFrame([jsonData]) datasets[docName] = df except: pass # Try to detect tabular data in text content if docName not in datasets and len(extractedText.splitlines()) > 2: lines = extractedText.splitlines() if any(',' in line for line in lines[:5]): try: df = pd.read_csv(io.StringIO(extractedText)) if len(df.columns) > 1: datasets[docName] = df except: pass elif any('\t' in line for line in lines[:5]): try: df = pd.read_csv(io.StringIO(extractedText), sep='\t') if len(df.columns) > 1: datasets[docName] = df except: pass return datasets, documentContext async def _analyzeTask(self, prompt: str, documentContext: str, datasets: Dict[str, Any], outputSpecs: List[Dict[str, Any]]) -> Dict[str, Any]: """ Analyze the task requirements using AI. Args: prompt: The task prompt documentContext: Context from input documents datasets: Available datasets outputSpecs: Output specifications Returns: Analysis plan dictionary """ # Create analysis prompt analysisPrompt = f""" Analyze this data analysis task and create a detailed plan: TASK: {prompt} DOCUMENT CONTEXT: {documentContext} AVAILABLE DATASETS: {json.dumps(datasets, indent=2)} REQUIRED OUTPUTS: {json.dumps(outputSpecs, indent=2)} Create a detailed analysis plan in JSON format with: {{ "analysisSteps": [ {{ "step": "step description", "purpose": "why this step is needed", "datasets": ["dataset1", "dataset2"], "techniques": ["technique1", "technique2"], "outputs": ["output1", "output2"] }} ], "visualizations": [ {{ "type": "visualization type", "purpose": "what it shows", "datasets": ["dataset1"], "settings": {{"key": "value"}} }} ], "insights": [ {{ "type": "insight type", "description": "what to look for", "datasets": ["dataset1"] }} ], "feedback": "explanation of the analysis approach" }} Respond with ONLY the JSON object, no additional text or explanations. """ try: # Get analysis plan from AI response = await self.service.base.callAi([ {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."}, {"role": "user", "content": analysisPrompt} ], produceUserAnswer=True) # Extract JSON jsonStart = response.find('{') jsonEnd = response.rfind('}') + 1 if jsonStart >= 0 and jsonEnd > jsonStart: plan = json.loads(response[jsonStart:jsonEnd]) return plan else: # Fallback plan logger.warning(f"Not able creating analysis plan, generating fallback plan") return { "analysisSteps": [ { "step": "Basic data analysis", "purpose": "Understand the data structure and content", "datasets": list(datasets.keys()), "techniques": ["summary statistics", "data visualization"], "outputs": ["summary report", "basic visualizations"] } ], "visualizations": [ { "type": "basic charts", "purpose": "Show data distribution and relationships", "datasets": list(datasets.keys()), "settings": {} } ], "insights": [ { "type": "basic insights", "description": "Key findings from the data", "datasets": list(datasets.keys()) } ], "feedback": f"I'll analyze the data and provide insights about {prompt}" } except Exception as e: logger.warning(f"Error creating analysis plan: {str(e)}") # Simple fallback plan return { "analysisSteps": [ { "step": "Basic data analysis", "purpose": "Understand the data structure and content", "datasets": list(datasets.keys()), "techniques": ["summary statistics", "data visualization"], "outputs": ["summary report", "basic visualizations"] } ], "visualizations": [ { "type": "basic charts", "purpose": "Show data distribution and relationships", "datasets": list(datasets.keys()), "settings": {} } ], "insights": [ { "type": "basic insights", "description": "Key findings from the data", "datasets": list(datasets.keys()) } ], "feedback": f"I'll analyze the data and provide insights about {prompt}" } async def _createAnalysisPlan(self, prompt: str) -> Dict[str, Any]: """ Create an analysis plan based on the task prompt. Args: prompt: The task prompt Returns: Analysis plan dictionary """ try: # Create analysis prompt analysisPrompt = f""" Analyze this data analysis task and create a detailed plan: TASK: {prompt} Create a detailed analysis plan in JSON format with: {{ "requiresAnalysis": true/false, "analysisSteps": [ {{ "step": "step description", "purpose": "why this step is needed", "techniques": ["technique1", "technique2"], "outputs": ["output1", "output2"] }} ], "visualizations": [ {{ "type": "visualization type", "purpose": "what it shows", "settings": {{"key": "value"}} }} ], "insights": [ {{ "type": "insight type", "description": "what to look for" }} ], "feedback": "explanation of the analysis approach" }} Respond with ONLY the JSON object, no additional text or explanations. """ # Get analysis plan from AI response = await self.service.base.callAi([ {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."}, {"role": "user", "content": analysisPrompt} ], produceUserAnswer=True) # Extract JSON jsonStart = response.find('{') jsonEnd = response.rfind('}') + 1 if jsonStart >= 0 and jsonEnd > jsonStart: plan = json.loads(response[jsonStart:jsonEnd]) return plan else: # Fallback plan logger.warning(f"Not able creating analysis plan, generating fallback plan") return { "requiresAnalysis": True, "analysisSteps": [ { "step": "Basic data analysis", "purpose": "Understand the data structure and content", "techniques": ["summary statistics", "data visualization"], "outputs": ["summary report", "basic visualizations"] } ], "visualizations": [ { "type": "basic charts", "purpose": "Show data distribution and relationships", "settings": {} } ], "insights": [ { "type": "basic insights", "description": "Key findings from the data" } ], "feedback": f"I'll analyze the data and provide insights about {prompt}" } except Exception as e: logger.warning(f"Error creating analysis plan: {str(e)}") # Simple fallback plan return { "requiresAnalysis": True, "analysisSteps": [ { "step": "Basic data analysis", "purpose": "Understand the data structure and content", "techniques": ["summary statistics", "data visualization"], "outputs": ["summary report", "basic visualizations"] } ], "visualizations": [ { "type": "basic charts", "purpose": "Show data distribution and relationships", "settings": {} } ], "insights": [ { "type": "basic insights", "description": "Key findings from the data" } ], "feedback": f"I'll analyze the data and provide insights about {prompt}" } async def _createVisualization(self, datasets: Dict, prompt: str, outputLabel: str, analysisPlan: Dict, description: str) -> Dict: """ Create a visualization based on the analysis plan. Args: datasets: Dictionary of datasets prompt: Original task prompt outputLabel: Output file label analysisPlan: Analysis plan description: Output description Returns: Document dictionary with visualization """ try: # Get visualization recommendations vizRecommendations = analysisPlan.get("visualizations", []) if not vizRecommendations: # Generate visualization recommendations if none provided self.service.base.logAdd(analysisPlan.get("workflowId"), "Generating visualization recommendations...", level="info", progress=50) vizPrompt = f""" Based on this data and task, recommend appropriate visualizations. TASK: {prompt} DESCRIPTION: {description} DATASETS: {json.dumps({name: {"shape": df.shape, "columns": df.columns.tolist()} for name, df in datasets.items()}, indent=2)} Recommend visualizations in JSON format: {{ "visualizations": [ {{ "type": "chart_type", "dataSource": "dataset_name", "variables": ["col1", "col2"], "purpose": "explanation" }} ] }} """ response = await self.service.base.callAi([ {"role": "system", "content": "You are a data visualization expert. Recommend appropriate visualizations based on the data and task."}, {"role": "user", "content": vizPrompt} ]) # Extract JSON jsonStart = response.find('{') jsonEnd = response.rfind('}') + 1 if jsonStart >= 0 and jsonEnd > jsonStart: vizData = json.loads(response[jsonStart:jsonEnd]) vizRecommendations = vizData.get("visualizations", []) # Determine format from filename formatType = outputLabel.split('.')[-1].lower() if formatType not in ['png', 'jpg', 'jpeg', 'svg']: formatType = 'png' # If no datasets available, create error message image if not datasets: plt.figure(figsize=(10, 6)) plt.text(0.5, 0.5, "No data available for visualization", ha='center', va='center', fontsize=14) plt.tight_layout() imgData = self._getImageBase64(formatType) plt.close() return { "label": outputLabel, "content": imgData, "metadata": { "contentType": f"image/{formatType}" } } # Prepare dataset info for the first dataset if none specified if not vizRecommendations and datasets: name, df = next(iter(datasets.items())) vizRecommendations = [{ "type": "auto", "dataSource": name, "variables": df.columns.tolist()[:5], "purpose": "general analysis" }] # Create visualization code prompt vizPrompt = f""" Generate Python matplotlib/seaborn code to create a visualization for: TASK: {prompt} VISUALIZATION REQUIREMENTS: - Output format: {formatType} - Filename: {outputLabel} - Description: {description} RECOMMENDED VISUALIZATION: {json.dumps(vizRecommendations, indent=2)} AVAILABLE DATASETS: """ # Add dataset info for recommended sources for viz in vizRecommendations: dataSource = viz.get("dataSource") if dataSource in datasets: df = datasets[dataSource] vizPrompt += f"\nDataset '{dataSource}':\n" vizPrompt += f"- Shape: {df.shape}\n" vizPrompt += f"- Columns: {df.columns.tolist()}\n" vizPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n" vizPrompt += """ Generate ONLY Python code that: 1. Uses matplotlib and/or seaborn to create a clear visualization 2. Sets figure size to (10, 6) 3. Includes appropriate titles, labels, and legend 4. Uses professional color schemes 5. Handles any missing data gracefully Return ONLY executable Python code, no explanations or markdown. """ try: # Get visualization code from AI vizCode = await self.service.base.callAi([ {"role": "system", "content": "You are a data visualization expert. Provide only executable Python code."}, {"role": "user", "content": vizPrompt} ], produceUserAnswer = True) # Clean code vizCode = vizCode.replace("```python", "").replace("```", "").strip() # Execute visualization code plt.figure(figsize=(10, 6)) # Make local variables available to the code localVars = { "plt": plt, "sns": sns, "pd": pd, "np": __import__('numpy') } # Add datasets to local variables for name, df in datasets.items(): # Create a sanitized variable name varName = ''.join(c if c.isalnum() else '_' for c in name) localVars[varName] = df # Also add with standard names for simpler code if "df" not in localVars: localVars["df"] = df elif "df2" not in localVars: localVars["df2"] = df # Execute the visualization code exec(vizCode, globals(), localVars) # Capture the image imgData = self._getImageBase64(formatType) plt.close() return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") except Exception as e: logger.error(f"Error creating visualization: {str(e)}", exc_info=True) # Create error message image plt.figure(figsize=(10, 6)) plt.text(0.5, 0.5, f"Visualization error: {str(e)}", ha='center', va='center', fontsize=12) plt.tight_layout() imgData = self._getImageBase64(formatType) plt.close() return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") except Exception as e: logger.error(f"Error creating visualization: {str(e)}", exc_info=True) # Create error message image plt.figure(figsize=(10, 6)) plt.text(0.5, 0.5, f"Visualization error: {str(e)}", ha='center', va='center', fontsize=12) plt.tight_layout() imgData = self._getImageBase64(formatType) plt.close() return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}") async def _createDataDocument(self, datasets: Dict, prompt: str, outputLabel: str, analysisPlan: Dict, description: str) -> ChatContent: """ Create a data document (CSV, JSON, Excel) from analysis results. Args: datasets: Dictionary of datasets prompt: Original task prompt outputLabel: Output filename analysisPlan: Analysis plan description: Output description Returns: ChatContent object """ try: # Determine format from filename formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "csv" # Process data based on format if formatType == "csv": result = self._convertToCsv(datasets) elif formatType == "json": result = json.dumps(datasets, indent=2) elif formatType == "xlsx": result = self._convertToExcel(datasets) else: result = str(datasets) # Determine content type contentType = "text/csv" if formatType == "csv" else \ "application/json" if formatType == "json" else \ "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" if formatType == "xlsx" else \ "text/plain" return self.formatAgentDocumentOutput(outputLabel, result, contentType) except Exception as e: logger.error(f"Error creating data document: {str(e)}", exc_info=True) errorContent = f"Error generating {formatType} document: {str(e)}" return self.formatAgentDocumentOutput(outputLabel, errorContent, "text/plain") async def _createTextDocument(self, datasets: Dict, context: str, prompt: str, outputLabel: str, formatType: str, analysisPlan: Dict, description: str) -> ChatContent: """ Create a text document (markdown, HTML, text) from analysis results. Args: datasets: Dictionary of datasets context: Document context prompt: Original task prompt outputLabel: Output filename formatType: Output format analysisPlan: Analysis plan description: Output description Returns: ChatContent object """ try: # Generate dataset summaries datasetSummaries = [] for name, df in datasets.items(): summary = f"\nDataset: {name}\n" summary += f"Shape: {df.shape}\n" summary += f"Columns: {', '.join(df.columns)}\n" if not df.empty: summary += f"Sample data:\n{df.head(3).to_string()}\n" datasetSummaries.append(summary) # Generate analysis prompt analysisPrompt = f""" Create a detailed {formatType} document for: TASK: {prompt} OUTPUT REQUIREMENTS: - Format: {formatType} - Filename: {outputLabel} - Description: {description} ANALYSIS CONTEXT: {json.dumps(analysisPlan, indent=2)} DATASET SUMMARIES: {"".join(datasetSummaries)} DOCUMENT CONTEXT: {context[:2000]}... (truncated) Create a comprehensive, professional analysis document that addresses the task requirements. The document should: 1. Have a clear structure with headings and sections 2. Include relevant data findings and insights 3. Provide appropriate interpretations and recommendations 4. Format the content according to the required output format Your response should be the complete document content in the specified format. """ # Get document content from AI documentContent = await self.service.base.callAi([ {"role": "system", "content": f"You are a data analysis expert creating a {formatType} document."}, {"role": "user", "content": analysisPrompt} ], produceUserAnswer = True) # Clean HTML or Markdown if needed if formatType in ["md", "markdown"] and not documentContent.strip().startswith("#"): documentContent = f"# Analysis Report\n\n{documentContent}" elif formatType == "html" and not "{documentContent}" # Determine content type contentType = "text/markdown" if formatType in ["md", "markdown"] else \ "text/html" if formatType == "html" else \ "text/plain" return self.formatAgentDocumentOutput(outputLabel, documentContent, contentType) except Exception as e: logger.error(f"Error creating text document: {str(e)}", exc_info=True) # Create a simple error document if formatType in ["md", "markdown"]: content = f"# Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" elif formatType == "html": content = f"

Error in Analysis

There was an error generating the analysis: {str(e)}

" else: content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}" return self.formatAgentDocumentOutput(outputLabel, content, contentType) def _getImageBase64(self, formatType: str = 'png') -> str: """ Convert current matplotlib figure to base64 string. Args: formatType: Image format Returns: Base64 encoded string of the image """ buffer = io.BytesIO() plt.savefig(buffer, format=formatType, dpi=100) buffer.seek(0) imageData = buffer.getvalue() buffer.close() # Convert to base64 return base64.b64encode(imageData).decode('utf-8') async def _analyzeData(self, task: Dict[str, Any], analysisPlan: Dict[str, Any]) -> Dict[str, Any]: """ Analyze data based on the analysis plan. Args: task: Task dictionary with input documents and specifications analysisPlan: Analysis plan from _createAnalysisPlan Returns: Analysis results dictionary """ try: # Extract data from input documents inputDocuments = task.get("inputDocuments", []) datasets, documentContext = self._extractData(inputDocuments) # Get task information prompt = task.get("prompt", "") outputSpecs = task.get("outputSpecifications", []) # Analyze task requirements analysisResults = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs) # Add datasets and context to results analysisResults["datasets"] = datasets analysisResults["documentContext"] = documentContext return analysisResults except Exception as e: logger.error(f"Error analyzing data: {str(e)}", exc_info=True) return { "error": str(e), "datasets": {}, "documentContext": "" } async def _createOutputDocuments(self, prompt: str, analysisResults: Dict[str, Any], outputSpecs: List[Dict[str, Any]], analysisPlan: Dict[str, Any]) -> List[Dict[str, Any]]: """ Create output documents based on analysis results. Args: prompt: Original task prompt analysisResults: Results from data analysis outputSpecs: List of output specifications analysisPlan: Analysis plan from _createAnalysisPlan Returns: List of document objects """ documents = [] datasets = analysisResults.get("datasets", {}) documentContext = analysisResults.get("documentContext", "") # Process each output specification for spec in outputSpecs: outputLabel = spec.get("label", "") outputDescription = spec.get("description", "") # Determine format from filename formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt" try: # Create appropriate document based on format if formatType in ["png", "jpg", "jpeg", "svg"]: # Visualization output document = await self._createVisualization( datasets, prompt, outputLabel, analysisPlan, outputDescription ) elif formatType in ["csv", "json", "xlsx"]: # Data document output document = await self._createDataDocument( datasets, prompt, outputLabel, analysisPlan, outputDescription ) else: # Text document output (markdown, html, text) document = await self._createTextDocument( datasets, documentContext, prompt, outputLabel, formatType, analysisPlan, outputDescription ) documents.append(document) except Exception as e: logger.error(f"Error creating output document {outputLabel}: {str(e)}", exc_info=True) # Create error document errorDoc = self.formatAgentDocumentOutput( outputLabel, f"Error creating document: {str(e)}", "text/plain" ) documents.append(errorDoc) return documents # Factory function for the Analyst agent def getAgentAnalyst(): """Returns an instance of the Analyst agent.""" return AgentAnalyst()