777 lines
No EOL
31 KiB
Python
777 lines
No EOL
31 KiB
Python
"""
|
|
Data analyst agent for analysis and interpretation of data.
|
|
Focuses on output-first design with AI-powered analysis.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import io
|
|
import base64
|
|
from typing import Dict, Any, List
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
|
|
from modules.workflowAgentsRegistry import AgentBase
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AgentAnalyst(AgentBase):
|
|
"""AI-driven agent for data analysis and visualization"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the data analysis agent"""
|
|
super().__init__()
|
|
self.name = "analyst"
|
|
self.label = "Data Analysis"
|
|
self.description = "Analyzes data using AI-powered insights and visualizations, produce diagrams and visualizations"
|
|
self.capabilities = [
|
|
"dataAnalysis",
|
|
"statistics",
|
|
"visualization",
|
|
"dataInterpretation",
|
|
"reportGeneration"
|
|
]
|
|
|
|
# Set default visualization settings
|
|
plt.style.use('seaborn-v0_8-whitegrid')
|
|
|
|
def setDependencies(self, mydom=None):
|
|
"""Set external dependencies for the agent."""
|
|
self.mydom = mydom
|
|
|
|
async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Process a task by focusing on required outputs and using AI to generate them.
|
|
|
|
Args:
|
|
task: Task dictionary with prompt, inputDocuments, outputSpecifications
|
|
|
|
Returns:
|
|
Dictionary with feedback and documents
|
|
"""
|
|
try:
|
|
# Extract task information
|
|
prompt = task.get("prompt", "")
|
|
inputDocuments = task.get("inputDocuments", [])
|
|
outputSpecs = task.get("outputSpecifications", [])
|
|
|
|
# Check AI service
|
|
if not self.mydom:
|
|
return {
|
|
"feedback": "The Analyst agent requires an AI service to function.",
|
|
"documents": []
|
|
}
|
|
|
|
# Extract data from documents - focusing only on dataExtracted
|
|
self.mydom.logAdd(task["workflowId"], "Extracting data from documents...", level="info", progress=35)
|
|
datasets, documentContext = self._extractData(inputDocuments)
|
|
|
|
# Generate task analysis to understand what's needed
|
|
self.mydom.logAdd(task["workflowId"], "Analyzing task requirements...", level="info", progress=45)
|
|
analysisPlan = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs)
|
|
|
|
# Generate all required output documents
|
|
documents = []
|
|
|
|
# If no output specs provided, create default analysis outputs
|
|
if not outputSpecs:
|
|
outputSpecs = []
|
|
|
|
# Process each output specification
|
|
totalSpecs = len(outputSpecs)
|
|
for i, spec in enumerate(outputSpecs):
|
|
progress = 45 + int((i / totalSpecs) * 45) # Progress from 45% to 90%
|
|
self.mydom.logAdd(task["workflowId"], f"Creating output {i+1}/{totalSpecs}...", level="info", progress=progress)
|
|
|
|
outputLabel = spec.get("label", "")
|
|
outputDescription = spec.get("description", "")
|
|
|
|
# Determine type based on file extension
|
|
outputType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt"
|
|
|
|
# Generate appropriate content based on output type
|
|
if outputType in ['png', 'jpg', 'jpeg', 'svg']:
|
|
# Create visualization
|
|
document = await self._createVisualization(
|
|
datasets, prompt, outputLabel, analysisPlan, outputDescription
|
|
)
|
|
documents.append(document)
|
|
elif outputType in ['csv', 'json', 'xlsx']:
|
|
# Create data document
|
|
document = await self._createDataDocument(
|
|
datasets, prompt, outputLabel, analysisPlan, outputDescription
|
|
)
|
|
documents.append(document)
|
|
else:
|
|
# Create text document (report, analysis, etc.)
|
|
document = await self._createTextDocument(
|
|
datasets, documentContext, prompt, outputLabel,
|
|
outputType, analysisPlan, outputDescription
|
|
)
|
|
documents.append(document)
|
|
|
|
# Generate feedback
|
|
feedback = f"{analysisPlan.get('feedback')}"
|
|
if analysisPlan.get("insights"):
|
|
feedback += f"\n\n{analysisPlan.get('insights')}"
|
|
|
|
return {
|
|
"feedback": feedback,
|
|
"documents": documents
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in analysis: {str(e)}", exc_info=True)
|
|
return {
|
|
"feedback": f"Error during analysis: {str(e)}",
|
|
"documents": []
|
|
}
|
|
|
|
def _extractData(self, documents: List[Dict[str, Any]]) -> tuple:
|
|
"""
|
|
Extract data from documents, focusing on dataExtracted fields.
|
|
|
|
Args:
|
|
documents: List of input documents
|
|
|
|
Returns:
|
|
Tuple of (datasets dictionary, document context text)
|
|
"""
|
|
datasets = {}
|
|
documentContext = ""
|
|
|
|
# Process each document
|
|
for doc in documents:
|
|
docName = doc.get("name", "unnamed")
|
|
if doc.get("ext"):
|
|
docName = f"{docName}.{doc.get('ext')}"
|
|
|
|
documentContext += f"\n\n--- {docName} ---\n"
|
|
|
|
# Process contents
|
|
for content in doc.get("contents", []):
|
|
# Focus only on dataExtracted
|
|
if content.get("dataExtracted"):
|
|
extractedText = content.get("dataExtracted", "")
|
|
documentContext += extractedText
|
|
|
|
# Try to parse as structured data if appropriate
|
|
if docName.lower().endswith(('.csv', '.tsv')):
|
|
try:
|
|
df = pd.read_csv(io.StringIO(extractedText))
|
|
datasets[docName] = df
|
|
except:
|
|
pass
|
|
elif docName.lower().endswith('.json'):
|
|
try:
|
|
jsonData = json.loads(extractedText)
|
|
if isinstance(jsonData, list):
|
|
df = pd.DataFrame(jsonData)
|
|
datasets[docName] = df
|
|
elif isinstance(jsonData, dict):
|
|
# Handle nested JSON structures
|
|
if any(isinstance(v, list) for v in jsonData.values()):
|
|
for key, value in jsonData.items():
|
|
if isinstance(value, list) and len(value) > 0:
|
|
df = pd.DataFrame(value)
|
|
datasets[f"{docName}:{key}"] = df
|
|
else:
|
|
df = pd.DataFrame([jsonData])
|
|
datasets[docName] = df
|
|
except:
|
|
pass
|
|
|
|
# Try to detect tabular data in text content
|
|
if docName not in datasets and len(extractedText.splitlines()) > 2:
|
|
lines = extractedText.splitlines()
|
|
if any(',' in line for line in lines[:5]):
|
|
try:
|
|
df = pd.read_csv(io.StringIO(extractedText))
|
|
if len(df.columns) > 1:
|
|
datasets[docName] = df
|
|
except:
|
|
pass
|
|
elif any('\t' in line for line in lines[:5]):
|
|
try:
|
|
df = pd.read_csv(io.StringIO(extractedText), sep='\t')
|
|
if len(df.columns) > 1:
|
|
datasets[docName] = df
|
|
except:
|
|
pass
|
|
|
|
return datasets, documentContext
|
|
|
|
async def _analyzeTask(self, prompt: str, documentContext: str, datasets: Dict[str, Any], outputSpecs: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""
|
|
Analyze the task requirements using AI.
|
|
|
|
Args:
|
|
prompt: The task prompt
|
|
documentContext: Context from input documents
|
|
datasets: Available datasets
|
|
outputSpecs: Output specifications
|
|
|
|
Returns:
|
|
Analysis plan dictionary
|
|
"""
|
|
# Create analysis prompt
|
|
analysisPrompt = f"""
|
|
Analyze this data analysis task and create a detailed plan:
|
|
|
|
TASK: {prompt}
|
|
|
|
DOCUMENT CONTEXT:
|
|
{documentContext}
|
|
|
|
AVAILABLE DATASETS:
|
|
{json.dumps(datasets, indent=2)}
|
|
|
|
REQUIRED OUTPUTS:
|
|
{json.dumps(outputSpecs, indent=2)}
|
|
|
|
Create a detailed analysis plan in JSON format with:
|
|
{{
|
|
"analysisSteps": [
|
|
{{
|
|
"step": "step description",
|
|
"purpose": "why this step is needed",
|
|
"datasets": ["dataset1", "dataset2"],
|
|
"techniques": ["technique1", "technique2"],
|
|
"outputs": ["output1", "output2"]
|
|
}}
|
|
],
|
|
"visualizations": [
|
|
{{
|
|
"type": "visualization type",
|
|
"purpose": "what it shows",
|
|
"datasets": ["dataset1"],
|
|
"settings": {{"key": "value"}}
|
|
}}
|
|
],
|
|
"insights": [
|
|
{{
|
|
"type": "insight type",
|
|
"description": "what to look for",
|
|
"datasets": ["dataset1"]
|
|
}}
|
|
],
|
|
"feedback": "explanation of the analysis approach"
|
|
}}
|
|
|
|
Respond with ONLY the JSON object, no additional text or explanations.
|
|
"""
|
|
|
|
try:
|
|
# Get analysis plan from AI
|
|
response = await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."},
|
|
{"role": "user", "content": analysisPrompt}
|
|
], produceUserAnswer=True)
|
|
|
|
# Extract JSON
|
|
jsonStart = response.find('{')
|
|
jsonEnd = response.rfind('}') + 1
|
|
|
|
if jsonStart >= 0 and jsonEnd > jsonStart:
|
|
plan = json.loads(response[jsonStart:jsonEnd])
|
|
return plan
|
|
else:
|
|
# Fallback plan
|
|
logger.warning(f"Not able creating analysis plan, generating fallback plan")
|
|
return {
|
|
"analysisSteps": [
|
|
{
|
|
"step": "Basic data analysis",
|
|
"purpose": "Understand the data structure and content",
|
|
"datasets": list(datasets.keys()),
|
|
"techniques": ["summary statistics", "data visualization"],
|
|
"outputs": ["summary report", "basic visualizations"]
|
|
}
|
|
],
|
|
"visualizations": [
|
|
{
|
|
"type": "basic charts",
|
|
"purpose": "Show data distribution and relationships",
|
|
"datasets": list(datasets.keys()),
|
|
"settings": {}
|
|
}
|
|
],
|
|
"insights": [
|
|
{
|
|
"type": "basic insights",
|
|
"description": "Key findings from the data",
|
|
"datasets": list(datasets.keys())
|
|
}
|
|
],
|
|
"feedback": f"I'll analyze the data and provide insights about {prompt}"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error creating analysis plan: {str(e)}")
|
|
# Simple fallback plan
|
|
return {
|
|
"analysisSteps": [
|
|
{
|
|
"step": "Basic data analysis",
|
|
"purpose": "Understand the data structure and content",
|
|
"datasets": list(datasets.keys()),
|
|
"techniques": ["summary statistics", "data visualization"],
|
|
"outputs": ["summary report", "basic visualizations"]
|
|
}
|
|
],
|
|
"visualizations": [
|
|
{
|
|
"type": "basic charts",
|
|
"purpose": "Show data distribution and relationships",
|
|
"datasets": list(datasets.keys()),
|
|
"settings": {}
|
|
}
|
|
],
|
|
"insights": [
|
|
{
|
|
"type": "basic insights",
|
|
"description": "Key findings from the data",
|
|
"datasets": list(datasets.keys())
|
|
}
|
|
],
|
|
"feedback": f"I'll analyze the data and provide insights about {prompt}"
|
|
}
|
|
|
|
async def _createVisualization(self, datasets: Dict, prompt: str, outputLabel: str,
|
|
analysisPlan: Dict, description: str) -> Dict:
|
|
"""
|
|
Create a visualization based on the analysis plan.
|
|
|
|
Args:
|
|
datasets: Dictionary of datasets
|
|
prompt: Original task prompt
|
|
outputLabel: Output file label
|
|
analysisPlan: Analysis plan
|
|
description: Output description
|
|
|
|
Returns:
|
|
Document dictionary with visualization
|
|
"""
|
|
try:
|
|
# Get visualization recommendations
|
|
vizRecommendations = analysisPlan.get("visualizations", [])
|
|
|
|
if not vizRecommendations:
|
|
# Generate visualization recommendations if none provided
|
|
self.mydom.logAdd(analysisPlan.get("workflowId"), "Generating visualization recommendations...", level="info", progress=50)
|
|
vizPrompt = f"""
|
|
Based on this data and task, recommend appropriate visualizations.
|
|
|
|
TASK: {prompt}
|
|
DESCRIPTION: {description}
|
|
|
|
DATASETS:
|
|
{json.dumps({name: {"shape": df.shape, "columns": df.columns.tolist()}
|
|
for name, df in datasets.items()}, indent=2)}
|
|
|
|
Recommend visualizations in JSON format:
|
|
{{
|
|
"visualizations": [
|
|
{{
|
|
"type": "chart_type",
|
|
"dataSource": "dataset_name",
|
|
"variables": ["col1", "col2"],
|
|
"purpose": "explanation"
|
|
}}
|
|
]
|
|
}}
|
|
"""
|
|
|
|
response = await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a data visualization expert. Recommend appropriate visualizations based on the data and task."},
|
|
{"role": "user", "content": vizPrompt}
|
|
])
|
|
|
|
# Extract JSON
|
|
jsonStart = response.find('{')
|
|
jsonEnd = response.rfind('}') + 1
|
|
|
|
if jsonStart >= 0 and jsonEnd > jsonStart:
|
|
vizData = json.loads(response[jsonStart:jsonEnd])
|
|
vizRecommendations = vizData.get("visualizations", [])
|
|
|
|
# Determine format from filename
|
|
formatType = outputLabel.split('.')[-1].lower()
|
|
if formatType not in ['png', 'jpg', 'jpeg', 'svg']:
|
|
formatType = 'png'
|
|
|
|
# If no datasets available, create error message image
|
|
if not datasets:
|
|
plt.figure(figsize=(10, 6))
|
|
plt.text(0.5, 0.5, "No data available for visualization",
|
|
ha='center', va='center', fontsize=14)
|
|
plt.tight_layout()
|
|
imgData = self._getImageBase64(formatType)
|
|
plt.close()
|
|
|
|
return {
|
|
"label": outputLabel,
|
|
"content": imgData,
|
|
"metadata": {
|
|
"contentType": f"image/{formatType}"
|
|
}
|
|
}
|
|
|
|
# Prepare dataset info for the first dataset if none specified
|
|
if not vizRecommendations and datasets:
|
|
name, df = next(iter(datasets.items()))
|
|
vizRecommendations = [{
|
|
"type": "auto",
|
|
"dataSource": name,
|
|
"variables": df.columns.tolist()[:5],
|
|
"purpose": "general analysis"
|
|
}]
|
|
|
|
# Create visualization code prompt
|
|
vizPrompt = f"""
|
|
Generate Python matplotlib/seaborn code to create a visualization for:
|
|
|
|
TASK: {prompt}
|
|
|
|
VISUALIZATION REQUIREMENTS:
|
|
- Output format: {formatType}
|
|
- Filename: {outputLabel}
|
|
- Description: {description}
|
|
|
|
RECOMMENDED VISUALIZATION:
|
|
{json.dumps(vizRecommendations, indent=2)}
|
|
|
|
AVAILABLE DATASETS:
|
|
"""
|
|
|
|
# Add dataset info for recommended sources
|
|
for viz in vizRecommendations:
|
|
dataSource = viz.get("dataSource")
|
|
if dataSource in datasets:
|
|
df = datasets[dataSource]
|
|
vizPrompt += f"\nDataset '{dataSource}':\n"
|
|
vizPrompt += f"- Shape: {df.shape}\n"
|
|
vizPrompt += f"- Columns: {df.columns.tolist()}\n"
|
|
vizPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n"
|
|
|
|
vizPrompt += """
|
|
Generate ONLY Python code that:
|
|
1. Uses matplotlib and/or seaborn to create a clear visualization
|
|
2. Sets figure size to (10, 6)
|
|
3. Includes appropriate titles, labels, and legend
|
|
4. Uses professional color schemes
|
|
5. Handles any missing data gracefully
|
|
|
|
Return ONLY executable Python code, no explanations or markdown.
|
|
"""
|
|
|
|
try:
|
|
# Get visualization code from AI
|
|
vizCode = await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a data visualization expert. Provide only executable Python code."},
|
|
{"role": "user", "content": vizPrompt}
|
|
], produceUserAnswer = True)
|
|
|
|
# Clean code
|
|
vizCode = vizCode.replace("```python", "").replace("```", "").strip()
|
|
|
|
# Execute visualization code
|
|
plt.figure(figsize=(10, 6))
|
|
|
|
# Make local variables available to the code
|
|
localVars = {
|
|
"plt": plt,
|
|
"sns": sns,
|
|
"pd": pd,
|
|
"np": __import__('numpy')
|
|
}
|
|
|
|
# Add datasets to local variables
|
|
for name, df in datasets.items():
|
|
# Create a sanitized variable name
|
|
varName = ''.join(c if c.isalnum() else '_' for c in name)
|
|
localVars[varName] = df
|
|
|
|
# Also add with standard names for simpler code
|
|
if "df" not in localVars:
|
|
localVars["df"] = df
|
|
elif "df2" not in localVars:
|
|
localVars["df2"] = df
|
|
|
|
# Execute the visualization code
|
|
exec(vizCode, globals(), localVars)
|
|
|
|
# Capture the image
|
|
imgData = self._getImageBase64(formatType)
|
|
plt.close()
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating visualization: {str(e)}", exc_info=True)
|
|
|
|
# Create error message image
|
|
plt.figure(figsize=(10, 6))
|
|
plt.text(0.5, 0.5, f"Visualization error: {str(e)}",
|
|
ha='center', va='center', fontsize=12)
|
|
plt.tight_layout()
|
|
imgData = self._getImageBase64(formatType)
|
|
plt.close()
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating visualization: {str(e)}", exc_info=True)
|
|
|
|
# Create error message image
|
|
plt.figure(figsize=(10, 6))
|
|
plt.text(0.5, 0.5, f"Visualization error: {str(e)}",
|
|
ha='center', va='center', fontsize=12)
|
|
plt.tight_layout()
|
|
imgData = self._getImageBase64(formatType)
|
|
plt.close()
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, imgData, f"image/{formatType}")
|
|
|
|
async def _createDataDocument(self, datasets: Dict, prompt: str, outputLabel: str,
|
|
analysisPlan: Dict, description: str) -> Dict:
|
|
"""
|
|
Create a data document (e.g., CSV, JSON) based on analysis.
|
|
|
|
Args:
|
|
datasets: Dictionary of datasets
|
|
prompt: Original task prompt
|
|
outputLabel: Output filename
|
|
analysisPlan: Analysis plan from AI
|
|
description: Output description
|
|
|
|
Returns:
|
|
Data document
|
|
"""
|
|
# Determine format from filename
|
|
formatType = outputLabel.split('.')[-1].lower()
|
|
|
|
# If no datasets available, return error message
|
|
if not datasets:
|
|
return {
|
|
"label": outputLabel,
|
|
"content": f"No data available for processing into {formatType} format.",
|
|
"metadata": {
|
|
"contentType": "text/plain"
|
|
}
|
|
}
|
|
|
|
# Generate data processing instructions
|
|
dataPrompt = f"""
|
|
Create Python code to process datasets and generate a {formatType} file for:
|
|
|
|
TASK: {prompt}
|
|
|
|
OUTPUT REQUIREMENTS:
|
|
- Format: {formatType}
|
|
- Filename: {outputLabel}
|
|
- Description: {description}
|
|
|
|
ANALYSIS CONTEXT:
|
|
{json.dumps(analysisPlan, indent=2)}
|
|
|
|
AVAILABLE DATASETS:
|
|
"""
|
|
|
|
# Add dataset info
|
|
for name, df in datasets.items():
|
|
dataPrompt += f"\nDataset '{name}':\n"
|
|
dataPrompt += f"- Shape: {df.shape}\n"
|
|
dataPrompt += f"- Columns: {df.columns.tolist()}\n"
|
|
dataPrompt += f"- Sample data: {df.head(3).to_dict(orient='records')}\n"
|
|
|
|
dataPrompt += """
|
|
Generate Python code that:
|
|
1. Processes the available dataset(s)
|
|
2. Performs necessary transformations, aggregations, or calculations
|
|
3. Outputs the result in the requested format
|
|
4. Returns the content as a string variable named 'result'
|
|
|
|
Return ONLY executable Python code, no explanations or markdown.
|
|
"""
|
|
|
|
try:
|
|
# Get data processing code from AI
|
|
dataCode = await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a data processing expert. Provide only executable Python code."},
|
|
{"role": "user", "content": dataPrompt}
|
|
], produceUserAnswer = True)
|
|
|
|
# Clean code
|
|
dataCode = dataCode.replace("```python", "").replace("```", "").strip()
|
|
|
|
# Setup execution environment
|
|
localVars = {"pd": pd, "np": __import__('numpy'), "io": io}
|
|
|
|
# Add datasets to local variables
|
|
for name, df in datasets.items():
|
|
# Create a sanitized variable name
|
|
varName = ''.join(c if c.isalnum() else '_' for c in name)
|
|
localVars[varName] = df
|
|
|
|
# Also add with standard names for simpler code
|
|
if "df" not in localVars:
|
|
localVars["df"] = df
|
|
elif "df2" not in localVars:
|
|
localVars["df2"] = df
|
|
|
|
# Execute the code
|
|
exec(dataCode, globals(), localVars)
|
|
|
|
# Get the result
|
|
result = localVars.get("result", "No output was generated.")
|
|
|
|
# Determine content type
|
|
contentType = "text/csv" if formatType == "csv" else \
|
|
"application/json" if formatType == "json" else \
|
|
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" if formatType == "xlsx" else \
|
|
"text/plain"
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, result, contentType)
|
|
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating data document: {str(e)}", exc_info=True)
|
|
|
|
return {
|
|
"label": outputLabel,
|
|
"content": f"Error generating {formatType} document: {str(e)}",
|
|
"metadata": {
|
|
"contentType": "text/plain"
|
|
}
|
|
}
|
|
|
|
async def _createTextDocument(self, datasets: Dict, context: str, prompt: str,
|
|
outputLabel: str, formatType: str,
|
|
analysisPlan: Dict, description: str) -> Dict:
|
|
"""
|
|
Create a text document (report, analysis, etc.) based on analysis.
|
|
|
|
Args:
|
|
datasets: Dictionary of datasets
|
|
context: Document context text
|
|
prompt: Original task prompt
|
|
outputLabel: Output filename
|
|
formatType: Output format type
|
|
analysisPlan: Analysis plan from AI
|
|
description: Output description
|
|
|
|
Returns:
|
|
Text document
|
|
"""
|
|
# Create dataset summaries
|
|
datasetSummaries = []
|
|
for name, df in datasets.items():
|
|
summary = f"Dataset: {name}\n"
|
|
summary += f"- Shape: {df.shape[0]} rows, {df.shape[1]} columns\n"
|
|
summary += f"- Columns: {', '.join(df.columns.tolist())}\n"
|
|
|
|
# Basic statistics for numeric columns
|
|
numericCols = df.select_dtypes(include=['number']).columns
|
|
if len(numericCols) > 0:
|
|
summary += "- Numeric Columns Stats:\n"
|
|
for col in numericCols[:3]: # Limit to first 3
|
|
stats = df[col].describe()
|
|
summary += f" - {col}: min={stats['min']:.2f}, max={stats['max']:.2f}, mean={stats['mean']:.2f}\n"
|
|
|
|
datasetSummaries.append(summary)
|
|
|
|
# Determine content type based on format
|
|
contentType = "text/markdown" if formatType in ["md", "markdown"] else \
|
|
"text/html" if formatType == "html" else \
|
|
"text/plain"
|
|
|
|
# Generate analysis prompt
|
|
analysisPrompt = f"""
|
|
Create a detailed {formatType} document for:
|
|
|
|
TASK: {prompt}
|
|
|
|
OUTPUT REQUIREMENTS:
|
|
- Format: {formatType}
|
|
- Filename: {outputLabel}
|
|
- Description: {description}
|
|
|
|
ANALYSIS CONTEXT:
|
|
{json.dumps(analysisPlan, indent=2)}
|
|
|
|
DATASET SUMMARIES:
|
|
{"".join(datasetSummaries)}
|
|
|
|
DOCUMENT CONTEXT:
|
|
{context[:2000]}... (truncated)
|
|
|
|
Create a comprehensive, professional analysis document that addresses the task requirements.
|
|
The document should:
|
|
1. Have a clear structure with headings and sections
|
|
2. Include relevant data findings and insights
|
|
3. Provide appropriate interpretations and recommendations
|
|
4. Format the content according to the required output format
|
|
|
|
Your response should be the complete document content in the specified format.
|
|
"""
|
|
|
|
try:
|
|
# Get document content from AI
|
|
documentContent = await self.mydom.callAi([
|
|
{"role": "system", "content": f"You are a data analysis expert creating a {formatType} document."},
|
|
{"role": "user", "content": analysisPrompt}
|
|
], produceUserAnswer = True)
|
|
|
|
# Clean HTML or Markdown if needed
|
|
if formatType in ["md", "markdown"] and not documentContent.strip().startswith("#"):
|
|
documentContent = f"# Analysis Report\n\n{documentContent}"
|
|
elif formatType == "html" and not "<html" in documentContent.lower():
|
|
documentContent = f"<html><body>{documentContent}</body></html>"
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, documentContent, contentType)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating text document: {str(e)}", exc_info=True)
|
|
|
|
# Create a simple error document
|
|
if formatType in ["md", "markdown"]:
|
|
content = f"# Error in Analysis\n\nThere was an error generating the analysis: {str(e)}"
|
|
elif formatType == "html":
|
|
content = f"<html><body><h1>Error in Analysis</h1><p>There was an error generating the analysis: {str(e)}</p></body></html>"
|
|
else:
|
|
content = f"Error in Analysis\n\nThere was an error generating the analysis: {str(e)}"
|
|
|
|
return {
|
|
"label": outputLabel,
|
|
"content": content,
|
|
"metadata": {
|
|
"contentType": contentType
|
|
}
|
|
}
|
|
|
|
def _getImageBase64(self, formatType: str = 'png') -> str:
|
|
"""
|
|
Convert current matplotlib figure to base64 string.
|
|
|
|
Args:
|
|
formatType: Image format
|
|
|
|
Returns:
|
|
Base64 encoded string of the image
|
|
"""
|
|
buffer = io.BytesIO()
|
|
plt.savefig(buffer, format=formatType, dpi=100)
|
|
buffer.seek(0)
|
|
imageData = buffer.getvalue()
|
|
buffer.close()
|
|
|
|
# Convert to base64
|
|
return base64.b64encode(imageData).decode('utf-8')
|
|
|
|
|
|
# Factory function for the Analyst agent
|
|
def getAgentAnalyst():
|
|
"""Returns an instance of the Analyst agent."""
|
|
return AgentAnalyst() |