From 8ea05780d1adb7dcc7fd0d641992d1e80f1af5b5 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sun, 11 May 2025 03:04:02 +0200 Subject: [PATCH] prod azure 1.1.1 fix agents --- modules/agentAnalyst.py | 296 +++++++++++++++++++++++++----- modules/agentCoach.py | 3 +- modules/agentCoder.py | 3 +- modules/agentDocumentation.py | 3 +- modules/agentEmail.py | 1 - modules/agentWebcrawler.py | 24 ++- modules/documentProcessor.py | 4 + modules/gatewayInterface.py | 33 +++- modules/lucydomInterface.py | 53 +++++- modules/workflowAgentsRegistry.py | 19 +- modules/workflowManager.py | 233 +++++++++++++++++++++-- notes/changelog.txt | 9 +- 12 files changed, 582 insertions(+), 99 deletions(-) diff --git a/modules/agentAnalyst.py b/modules/agentAnalyst.py index b967af5f..93107371 100644 --- a/modules/agentAnalyst.py +++ b/modules/agentAnalyst.py @@ -38,11 +38,10 @@ class AgentAnalyst(AgentBase): def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" - self.mydom = mydom - + async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ - Process a task by focusing on required outputs and using AI to generate them. + Process a task by focusing on required outputs and using AI to guide the analysis process. Args: task: Task dictionary with prompt, inputDocuments, outputSpecifications @@ -53,68 +52,49 @@ class AgentAnalyst(AgentBase): try: # Extract task information prompt = task.get("prompt", "") - inputDocuments = task.get("inputDocuments", []) outputSpecs = task.get("outputSpecifications", []) + workflow = task.get("context", {}).get("workflow", {}) # Check AI service if not self.mydom: return { - "feedback": "The Analyst agent requires an AI service to function.", + "feedback": "The Analyst agent requires an AI service to function effectively.", "documents": [] } - # Extract data from documents - focusing only on dataExtracted - self.mydom.logAdd(task["workflowId"], "Extracting data from documents...", level="info", progress=35) - datasets, documentContext = self._extractData(inputDocuments) + # Create analysis plan + if workflow: + self.workflowManager.logAdd(workflow, "Extracting data from documents...", level="info", progress=35) + analysisPlan = await self._createAnalysisPlan(prompt) - # Generate task analysis to understand what's needed - self.mydom.logAdd(task["workflowId"], "Analyzing task requirements...", level="info", progress=45) - analysisPlan = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs) + # Check if this is truly an analysis task + if not analysisPlan.get("requiresAnalysis", True): + return { + "feedback": "This task doesn't appear to require analysis. Please try a different agent.", + "documents": [] + } - # Generate all required output documents - documents = [] + # Analyze data + if workflow: + self.workflowManager.logAdd(workflow, "Analyzing task requirements...", level="info", progress=45) + analysisResults = await self._analyzeData(task, analysisPlan) - # If no output specs provided, create default analysis outputs - if not outputSpecs: - outputSpecs = [] - - # Process each output specification + # Format results into requested output documents totalSpecs = len(outputSpecs) for i, spec in enumerate(outputSpecs): - progress = 45 + int((i / totalSpecs) * 45) # Progress from 45% to 90% - self.mydom.logAdd(task["workflowId"], f"Creating output {i+1}/{totalSpecs}...", level="info", progress=progress) - - outputLabel = spec.get("label", "") - outputDescription = spec.get("description", "") - - # Determine type based on file extension - outputType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt" - - # Generate appropriate content based on output type - if outputType in ['png', 'jpg', 'jpeg', 'svg']: - # Create visualization - document = await self._createVisualization( - datasets, prompt, outputLabel, analysisPlan, outputDescription - ) - documents.append(document) - elif outputType in ['csv', 'json', 'xlsx']: - # Create data document - document = await self._createDataDocument( - datasets, prompt, outputLabel, analysisPlan, outputDescription - ) - documents.append(document) - else: - # Create text document (report, analysis, etc.) - document = await self._createTextDocument( - datasets, documentContext, prompt, outputLabel, - outputType, analysisPlan, outputDescription - ) - documents.append(document) + progress = 50 + int((i / totalSpecs) * 40) # Progress from 50% to 90% + if self.workflowManager: + self.workflowManager.logAdd(workflow, f"Creating output {i+1}/{totalSpecs}...", level="info", progress=progress) + + documents = await self._createOutputDocuments( + prompt, + analysisResults, + outputSpecs, + analysisPlan + ) # Generate feedback - feedback = f"{analysisPlan.get('feedback')}" - if analysisPlan.get("insights"): - feedback += f"\n\n{analysisPlan.get('insights')}" + feedback = analysisPlan.get("feedback", f"I analyzed '{prompt[:50]}...' and generated {len(documents)} output documents.") return { "feedback": feedback, @@ -122,7 +102,7 @@ class AgentAnalyst(AgentBase): } except Exception as e: - logger.error(f"Error in analysis: {str(e)}", exc_info=True) + logger.error(f"Error during analysis: {str(e)}", exc_info=True) return { "feedback": f"Error during analysis: {str(e)}", "documents": [] @@ -337,6 +317,124 @@ class AgentAnalyst(AgentBase): ], "feedback": f"I'll analyze the data and provide insights about {prompt}" } + + async def _createAnalysisPlan(self, prompt: str) -> Dict[str, Any]: + """ + Create an analysis plan based on the task prompt. + + Args: + prompt: The task prompt + + Returns: + Analysis plan dictionary + """ + try: + # Create analysis prompt + analysisPrompt = f""" + Analyze this data analysis task and create a detailed plan: + + TASK: {prompt} + + Create a detailed analysis plan in JSON format with: + {{ + "requiresAnalysis": true/false, + "analysisSteps": [ + {{ + "step": "step description", + "purpose": "why this step is needed", + "techniques": ["technique1", "technique2"], + "outputs": ["output1", "output2"] + }} + ], + "visualizations": [ + {{ + "type": "visualization type", + "purpose": "what it shows", + "settings": {{"key": "value"}} + }} + ], + "insights": [ + {{ + "type": "insight type", + "description": "what to look for" + }} + ], + "feedback": "explanation of the analysis approach" + }} + + Respond with ONLY the JSON object, no additional text or explanations. + """ + + # Get analysis plan from AI + response = await self.mydom.callAi([ + {"role": "system", "content": "You are a data analysis expert. Create detailed analysis plans. Respond with valid JSON only."}, + {"role": "user", "content": analysisPrompt} + ], produceUserAnswer=True) + + # Extract JSON + jsonStart = response.find('{') + jsonEnd = response.rfind('}') + 1 + + if jsonStart >= 0 and jsonEnd > jsonStart: + plan = json.loads(response[jsonStart:jsonEnd]) + return plan + else: + # Fallback plan + logger.warning(f"Not able creating analysis plan, generating fallback plan") + return { + "requiresAnalysis": True, + "analysisSteps": [ + { + "step": "Basic data analysis", + "purpose": "Understand the data structure and content", + "techniques": ["summary statistics", "data visualization"], + "outputs": ["summary report", "basic visualizations"] + } + ], + "visualizations": [ + { + "type": "basic charts", + "purpose": "Show data distribution and relationships", + "settings": {} + } + ], + "insights": [ + { + "type": "basic insights", + "description": "Key findings from the data" + } + ], + "feedback": f"I'll analyze the data and provide insights about {prompt}" + } + + except Exception as e: + logger.warning(f"Error creating analysis plan: {str(e)}") + # Simple fallback plan + return { + "requiresAnalysis": True, + "analysisSteps": [ + { + "step": "Basic data analysis", + "purpose": "Understand the data structure and content", + "techniques": ["summary statistics", "data visualization"], + "outputs": ["summary report", "basic visualizations"] + } + ], + "visualizations": [ + { + "type": "basic charts", + "purpose": "Show data distribution and relationships", + "settings": {} + } + ], + "insights": [ + { + "type": "basic insights", + "description": "Key findings from the data" + } + ], + "feedback": f"I'll analyze the data and provide insights about {prompt}" + } async def _createVisualization(self, datasets: Dict, prompt: str, outputLabel: str, analysisPlan: Dict, description: str) -> Dict: @@ -770,6 +868,102 @@ class AgentAnalyst(AgentBase): # Convert to base64 return base64.b64encode(imageData).decode('utf-8') + async def _analyzeData(self, task: Dict[str, Any], analysisPlan: Dict[str, Any]) -> Dict[str, Any]: + """ + Analyze data based on the analysis plan. + + Args: + task: Task dictionary with input documents and specifications + analysisPlan: Analysis plan from _createAnalysisPlan + + Returns: + Analysis results dictionary + """ + try: + # Extract data from input documents + inputDocuments = task.get("inputDocuments", []) + datasets, documentContext = self._extractData(inputDocuments) + + # Get task information + prompt = task.get("prompt", "") + outputSpecs = task.get("outputSpecifications", []) + + # Analyze task requirements + analysisResults = await self._analyzeTask(prompt, documentContext, datasets, outputSpecs) + + # Add datasets and context to results + analysisResults["datasets"] = datasets + analysisResults["documentContext"] = documentContext + + return analysisResults + + except Exception as e: + logger.error(f"Error analyzing data: {str(e)}", exc_info=True) + return { + "error": str(e), + "datasets": {}, + "documentContext": "" + } + + async def _createOutputDocuments(self, prompt: str, analysisResults: Dict[str, Any], + outputSpecs: List[Dict[str, Any]], analysisPlan: Dict[str, Any]) -> List[Dict[str, Any]]: + """ + Create output documents based on analysis results. + + Args: + prompt: Original task prompt + analysisResults: Results from data analysis + outputSpecs: List of output specifications + analysisPlan: Analysis plan from _createAnalysisPlan + + Returns: + List of document objects + """ + documents = [] + datasets = analysisResults.get("datasets", {}) + documentContext = analysisResults.get("documentContext", "") + + # Process each output specification + for spec in outputSpecs: + outputLabel = spec.get("label", "") + outputDescription = spec.get("description", "") + + # Determine format from filename + formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "txt" + + try: + # Create appropriate document based on format + if formatType in ["png", "jpg", "jpeg", "svg"]: + # Visualization output + document = await self._createVisualization( + datasets, prompt, outputLabel, analysisPlan, outputDescription + ) + elif formatType in ["csv", "json", "xlsx"]: + # Data document output + document = await self._createDataDocument( + datasets, prompt, outputLabel, analysisPlan, outputDescription + ) + else: + # Text document output (markdown, html, text) + document = await self._createTextDocument( + datasets, documentContext, prompt, outputLabel, formatType, + analysisPlan, outputDescription + ) + + documents.append(document) + + except Exception as e: + logger.error(f"Error creating output document {outputLabel}: {str(e)}", exc_info=True) + # Create error document + errorDoc = self.formatAgentDocumentOutput( + outputLabel, + f"Error creating document: {str(e)}", + "text/plain" + ) + documents.append(errorDoc) + + return documents + # Factory function for the Analyst agent def getAgentAnalyst(): diff --git a/modules/agentCoach.py b/modules/agentCoach.py index 6a0616bf..5995dff7 100644 --- a/modules/agentCoach.py +++ b/modules/agentCoach.py @@ -33,8 +33,7 @@ class AgentCoach(AgentBase): def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" - self.mydom = mydom - + async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Process a task by directly using AI to provide answers or content based on extracted data. diff --git a/modules/agentCoder.py b/modules/agentCoder.py index 60dc0072..8950e6c7 100644 --- a/modules/agentCoder.py +++ b/modules/agentCoder.py @@ -41,8 +41,7 @@ class AgentCoder(AgentBase): def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" - self.mydom = mydom - + async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Process a task and perform code development/execution. diff --git a/modules/agentDocumentation.py b/modules/agentDocumentation.py index 38b401d2..259795fe 100644 --- a/modules/agentDocumentation.py +++ b/modules/agentDocumentation.py @@ -30,8 +30,7 @@ class AgentDocumentation(AgentBase): def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" - self.mydom = mydom - + async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Process a task by focusing on required outputs and using AI to generate them. diff --git a/modules/agentEmail.py b/modules/agentEmail.py index 6686b725..116e58fc 100644 --- a/modules/agentEmail.py +++ b/modules/agentEmail.py @@ -46,7 +46,6 @@ class AgentEmail(AgentBase): def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" - self.mydom = mydom self._loadConfiguration() def _loadConfiguration(self): diff --git a/modules/agentWebcrawler.py b/modules/agentWebcrawler.py index 7dd87825..56cac5b1 100644 --- a/modules/agentWebcrawler.py +++ b/modules/agentWebcrawler.py @@ -52,7 +52,6 @@ class AgentWebcrawler(AgentBase): def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" - self.mydom = mydom async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ @@ -68,6 +67,7 @@ class AgentWebcrawler(AgentBase): # Extract task information prompt = task.get("prompt", "") outputSpecs = task.get("outputSpecifications", []) + workflow = task.get("context", {}).get("workflow", {}) # Check AI service if not self.mydom: @@ -77,7 +77,8 @@ class AgentWebcrawler(AgentBase): } # Create research plan - self.mydom.logAdd(task["workflowId"], "Creating research plan...", level="info", progress=35) + if workflow: + self.workflowManager.logAdd(workflow, "Creating research plan...", level="info", progress=35) researchPlan = await self._createResearchPlan(prompt) # Check if this is truly a web research task @@ -88,11 +89,13 @@ class AgentWebcrawler(AgentBase): } # Gather raw material through web research - self.mydom.logAdd(task["workflowId"], "Gathering research material...", level="info", progress=45) - rawResults = await self._gatherResearchMaterial(researchPlan) + if workflow: + self.workflowManager.logAdd(workflow, "Gathering research material...", level="info", progress=45) + rawResults = await self._gatherResearchMaterial(researchPlan, workflow) # Format results into requested output documents - self.mydom.logAdd(task["workflowId"], "Creating output documents...", level="info", progress=55) + if workflow: + self.workflowManager.logAdd(workflow, "Creating output documents...", level="info", progress=55) documents = await self._createOutputDocuments( prompt, rawResults, @@ -191,12 +194,13 @@ class AgentWebcrawler(AgentBase): "feedback": f"I'll conduct web research on '{prompt}' and gather relevant information." } - async def _gatherResearchMaterial(self, researchPlan: Dict[str, Any]) -> List[Dict[str, Any]]: + async def _gatherResearchMaterial(self, researchPlan: Dict[str, Any], workflow: Dict[str, Any]) -> List[Dict[str, Any]]: """ Gather research material based on the research plan. Args: researchPlan: Research plan dictionary + workflow: Current workflow object Returns: List of research results @@ -207,7 +211,8 @@ class AgentWebcrawler(AgentBase): directUrls = researchPlan.get("directUrls", [])[:self.maxUrl] for i, url in enumerate(directUrls): progress = 45 + int((i / len(directUrls)) * 5) # Progress from 45% to 50% - self.mydom.logAdd(researchPlan.get("workflowId"), f"Processing direct URL {i+1}/{len(directUrls)}...", level="info", progress=progress) + if hasattr(self, 'workflowManager') and self.workflowManager: + self.workflowManager.logAdd(workflow, f"Processing direct URL {i+1}/{len(directUrls)}...", level="info", progress=progress) logger.info(f"Processing direct URL: {url}") try: # Fetch and extract content @@ -233,7 +238,8 @@ class AgentWebcrawler(AgentBase): searchTerms = researchPlan.get("searchTerms", [])[:self.maxSearchTerms] for i, term in enumerate(searchTerms): progress = 50 + int((i / len(searchTerms)) * 5) # Progress from 50% to 55% - self.mydom.logAdd(researchPlan.get("workflowId"), f"Searching term {i+1}/{len(searchTerms)}...", level="info", progress=progress) + if hasattr(self, 'workflowManager') and self.workflowManager: + self.workflowManager.logAdd(workflow, f"Searching term {i+1}/{len(searchTerms)}...", level="info", progress=progress) logger.info(f"Searching for: {term}") try: # Perform search @@ -262,7 +268,7 @@ class AgentWebcrawler(AgentBase): if len(allResults) >= self.maxResults: break - # Create summaries in parallel for all results + # Create summaries for all results allResults = await self._summarizeAllResults(allResults, researchPlan) return allResults diff --git a/modules/documentProcessor.py b/modules/documentProcessor.py index 3e082578..ee48dc07 100644 --- a/modules/documentProcessor.py +++ b/modules/documentProcessor.py @@ -17,6 +17,10 @@ pdfExtractorLoaded = False officeExtractorLoaded = False imageProcessorLoaded = False +class FileProcessingError(Exception): + """Custom exception for file processing errors.""" + pass + def getDocumentContents(fileMetadata: Dict[str, Any], fileContent: bytes) -> List[Dict[str, Any]]: """ Main function for extracting content from a file based on its MIME type. diff --git a/modules/gatewayInterface.py b/modules/gatewayInterface.py index ea5aa542..8008e1f9 100644 --- a/modules/gatewayInterface.py +++ b/modules/gatewayInterface.py @@ -123,27 +123,50 @@ class GatewayInterface: def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ - Unified user access management function that filters data based on user privileges. + Unified user access management function that filters data based on user privileges + and adds access control attributes. Args: table: Name of the table recordset: Recordset to filter based on access rules Returns: - Filtered recordset based on user privilege level + Filtered recordset with access control attributes """ userPrivilege = self.currentUser.get("privilege", "user") + filtered_records = [] # Apply filtering based on privilege if userPrivilege == "sysadmin": - return recordset # System admins see all records + filtered_records = recordset # System admins see all records elif userPrivilege == "admin": # Admins see records in their mandate - return [r for r in recordset if r.get("mandateId") == self.mandateId] + filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId] else: # Regular users # Users only see records they own within their mandate - return [r for r in recordset + filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId and r.get("userId") == self.userId] + + # Add access control attributes to each record + for record in filtered_records: + record_id = record.get("id") + + # Set access control flags based on user permissions + if table == "mandates": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("mandates", record_id) + record["_hideDelete"] = not self._canModify("mandates", record_id) + elif table == "users": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("users", record_id) + record["_hideDelete"] = not self._canModify("users", record_id) + else: + # Default access control for other tables + record["_hideView"] = False + record["_hideEdit"] = not self._canModify(table, record_id) + record["_hideDelete"] = not self._canModify(table, record_id) + + return filtered_records def _canModify(self, table: str, recordId: Optional[int] = None) -> bool: """ diff --git a/modules/lucydomInterface.py b/modules/lucydomInterface.py index b09629d5..91d7769a 100644 --- a/modules/lucydomInterface.py +++ b/modules/lucydomInterface.py @@ -162,35 +162,72 @@ class LucyDOMInterface: def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ - Unified user access management function that filters data based on user privileges. + Unified user access management function that filters data based on user privileges + and adds access control attributes. Args: table: Name of the table recordset: Recordset to filter based on access rules Returns: - Filtered recordset based on user privilege level + Filtered recordset with access control attributes """ userPrivilege = self.currentUser.get("privilege", "user") + filtered_records = [] # Apply filtering based on privilege if userPrivilege == "sysadmin": - return recordset # System admins see all records + filtered_records = recordset # System admins see all records elif userPrivilege == "admin": # Admins see records in their mandate - return [r for r in recordset if r.get("mandateId") == self.mandateId] + filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId] else: # Regular users # To see all prompts from mandate 0 and own if table == "prompts": - return [r for r in recordset if + filtered_records = [r for r in recordset if (r.get("mandateId") == self.mandateId and r.get("userId") == self.userId) or (r.get("mandateId") == 0) ] - # Users see only their records - return [r for r in recordset + else: + # Users see only their records + filtered_records = [r for r in recordset if r.get("mandateId") == self.mandateId and r.get("userId") == self.userId] - + + # Add access control attributes to each record + for record in filtered_records: + record_id = record.get("id") + + # Set access control flags based on user permissions + if table == "prompts": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("prompts", record_id) + record["_hideDelete"] = not self._canModify("prompts", record_id) + elif table == "files": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("files", record_id) + record["_hideDelete"] = not self._canModify("files", record_id) + record["_hideDownload"] = not self._canModify("files", record_id) + elif table == "workflows": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("workflows", record_id) + record["_hideDelete"] = not self._canModify("workflows", record_id) + elif table == "workflowMessages": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("workflows", record.get("workflowId")) + record["_hideDelete"] = not self._canModify("workflows", record.get("workflowId")) + elif table == "workflowLogs": + record["_hideView"] = False # Everyone can view + record["_hideEdit"] = not self._canModify("workflows", record.get("workflowId")) + record["_hideDelete"] = not self._canModify("workflows", record.get("workflowId")) + else: + # Default access control for other tables + record["_hideView"] = False + record["_hideEdit"] = not self._canModify(table, record_id) + record["_hideDelete"] = not self._canModify(table, record_id) + + return filtered_records + def _canModify(self, table: str, recordId: Optional[int] = None) -> bool: """ Checks if the current user can modify (create/update/delete) records in a table. diff --git a/modules/workflowAgentsRegistry.py b/modules/workflowAgentsRegistry.py index 0847442b..0d3e03b9 100644 --- a/modules/workflowAgentsRegistry.py +++ b/modules/workflowAgentsRegistry.py @@ -32,6 +32,7 @@ class AgentBase: self.description = "Basic agent functionality" self.capabilities = [] self.mydom = None + self.workflowManager = None # Will be set by workflow manager def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" @@ -58,11 +59,16 @@ class AgentBase: Args: task: A dictionary containing: - taskId: Unique ID for this task - - workflowId: ID of the parent workflow (optional) + - workflowId: ID of the parent workflow - prompt: The main instruction for the agent - inputDocuments: List of document objects to process - outputSpecifications: List of required output documents - - context: Additional contextual information + - context: Additional contextual information including: + - workflow: The complete workflow object + - workflowRound: Current workflow round + - agentType: Type of agent + - timestamp: Task timestamp + - language: User language Returns: A dictionary containing: @@ -208,6 +214,11 @@ class AgentRegistry: self.mydom = mydom self.updateAgentDependencies() + def setWorkflowManager(self, workflowManager): + """Set the workflow manager reference for all agents.""" + for agent in self.agents.values(): + agent.workflowManager = workflowManager + def updateAgentDependencies(self): """Update dependencies for all registered agents.""" for agentId, agent in self.agents.items(): @@ -239,8 +250,8 @@ class AgentRegistry: if agentIdentifier in self.agents: agent = self.agents[agentIdentifier] # Ensure the agent has the AI service - if hasattr(agent, 'setDependencies') and self.mydom: - agent.setDependencies(mydom=self.mydom) + if self.mydom: + agent.mydom = self.mydom return agent logger.error(f"Agent with identifier '{agentIdentifier}' not found") return None diff --git a/modules/workflowManager.py b/modules/workflowManager.py index c4a6b520..99237e02 100644 --- a/modules/workflowManager.py +++ b/modules/workflowManager.py @@ -12,6 +12,7 @@ import uuid import base64 from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Union, Tuple +import time from modules.mimeUtils import isTextMimeType, determineContentEncoding @@ -58,6 +59,7 @@ class WorkflowManager: self.mydom = domInterface(mandateId, userId) self.agentRegistry = getAgentRegistry() self.agentRegistry.setMydom(self.mydom) + self.agentRegistry.setWorkflowManager(self) # Set self as workflow manager for all agents ### Workflow State Machine Implementation @@ -132,6 +134,7 @@ class WorkflowManager: Returns: Updated workflow with processing results """ + startTime = time.time() try: # State 3: User Message Processing self.checkExitCriteria(workflow) @@ -161,8 +164,42 @@ class WorkflowManager: } self.messageAdd(workflow, responseMessage) - self.logAdd(workflow, f"Planned outputs: {len(objFinalDocuments)} documents", level="info", progress=20) - self.logAdd(workflow, f"Work plan created with {len(objWorkplan)} steps", level="info", progress=25) + # Add detailed log entry about the task plan + taskPlanLog = "Input: " + if objFinalDocuments: + taskPlanLog += ", ".join(objFinalDocuments) + "
" + else: + taskPlanLog += "No input files
" + + # Work Plan Steps + for i, task in enumerate(objWorkplan, 1): + agentName = task.get("agent", "unknown") + taskPlanLog += f"{i}. Agent {agentName}
" + + # Input Documents + inputDocs = task.get("inputDocuments", []) + if inputDocs: + inputLabels = [doc.get("label", "unknown") for doc in inputDocs] + taskPlanLog += f"- Input: {', '.join(inputLabels)}
" + + # Task Prompt + prompt = task.get('prompt', 'No prompt') + taskPlanLog += f"- Task: {prompt}
" + + # Output Documents + outputDocs = task.get("outputDocuments", []) + if outputDocs: + outputLabels = [doc.get("label", "unknown") for doc in outputDocs] + taskPlanLog += f"- Output: {', '.join(outputLabels)}
" + + # Final Results + taskPlanLog += "Result: " + if objFinalDocuments: + taskPlanLog += ", ".join(objFinalDocuments) + else: + taskPlanLog += "No result files" + + self.logAdd(workflow, taskPlanLog, level="info", progress=25) # State 5: Agent Execution objResults = [] @@ -199,6 +236,10 @@ class WorkflowManager: self.checkExitCriteria(workflow) self.workflowFinish(workflow) + # Update processing time + endTime = time.time() + workflow["dataStats"]["processingTime"] = endTime - startTime + return workflow except Exception as e: @@ -207,10 +248,15 @@ class WorkflowManager: workflow["status"] = "failed" workflow["lastActivity"] = datetime.now().isoformat() + # Update processing time even on error + endTime = time.time() + workflow["dataStats"]["processingTime"] = endTime - startTime + # Update in database self.mydom.updateWorkflow(workflow["id"], { "status": "failed", - "lastActivity": workflow["lastActivity"] + "lastActivity": workflow["lastActivity"], + "dataStats": workflow["dataStats"] }) self.logAdd(workflow, f"Workflow failed: {str(e)}", level="error", progress=100) @@ -241,7 +287,12 @@ class WorkflowManager: "messages": [], # Empty list - will be filled with references "messageIds": [], # Initialize empty messageIds list "logs": [], - "dataStats": {}, + "dataStats": { + "bytesSent": 0, + "bytesReceived": 0, + "tokensUsed": 0, + "processingTime": 0.0 + }, "currentRound": 1, "status": "running", "lastActivity": currentTime, @@ -287,11 +338,24 @@ class WorkflowManager: else: workflow["currentRound"] = 1 + # Ensure dataStats exists with correct field names + if "dataStats" not in workflow: + workflow["dataStats"] = { + "bytesSent": 0, + "bytesReceived": 0, + "tokensUsed": 0, + "processingTime": 0.0 + } + elif "tokenCount" in workflow["dataStats"]: + # Convert old tokenCount to tokensUsed if needed + workflow["dataStats"]["tokensUsed"] = workflow["dataStats"].pop("tokenCount", 0) + # Update in database - only the relevant workflow fields workflowUpdate = { "status": workflow["status"], "lastActivity": workflow["lastActivity"], - "currentRound": workflow["currentRound"] + "currentRound": workflow["currentRound"], + "dataStats": workflow["dataStats"] # Include updated dataStats } self.mydom.updateWorkflow(workflowId, workflowUpdate) @@ -474,6 +538,9 @@ JSON_OUTPUT = {{ return [] agentLabel = agent.label + # Set workflow manager reference on the agent + agent.workflowManager = self + # Log the current step outputLabels = [] for doc in task.get("outputDocuments", []): @@ -498,7 +565,7 @@ JSON_OUTPUT = {{ # Prepare input documents for the agent inputDocuments = await self.prepareAgentInputDocuments(task.get('inputDocuments', []), workflow) - + # Create a standardized task object for the agent as per state machine spec agentTask = { "taskId": str(uuid.uuid4()), @@ -507,20 +574,61 @@ JSON_OUTPUT = {{ "inputDocuments": inputDocuments, "outputSpecifications": outputSpecs, "context": { + "workflow": workflow, # Add the complete workflow object "workflowRound": workflow.get("currentRound", 1), "agentType": agentName, "timestamp": datetime.now().isoformat(), "language": self.mydom.userLanguage # Pass language to agent } } - + # Execute the agent with the standardized task try: # Process the task using the agent's standardized interface logger.debug("TASK: "+self.parseJson2text(agentTask)) logger.debug(f"Agent '{agentName}' AI service available: {agent.mydom is not None}") + # Calculate bytes sent before processing + bytesSent = len(json.dumps(agentTask).encode('utf-8')) + for doc in inputDocuments: + if doc.get('data'): + bytesSent += len(doc['data'].encode('utf-8')) + for content in doc.get('contents', []): + if content.get('data'): + bytesSent += len(content['data'].encode('utf-8')) + + # Process the task + startTime = time.time() agentResults = await agent.processTask(agentTask) + endTime = time.time() + + # Calculate bytes received + bytesReceived = len(json.dumps(agentResults).encode('utf-8')) + for doc in agentResults.get('documents', []): + if doc.get('content'): + bytesReceived += len(doc['content'].encode('utf-8')) + + # Calculate tokens used (now using bytes) + tokensUsed = bytesSent + bytesReceived + + # Update workflow statistics + if 'dataStats' not in workflow: + workflow['dataStats'] = { + 'bytesSent': 0, + 'bytesReceived': 0, + 'tokensUsed': 0, + 'processingTime': 0 + } + + workflow['dataStats']['bytesSent'] += bytesSent + workflow['dataStats']['bytesReceived'] += bytesReceived + workflow['dataStats']['tokensUsed'] += tokensUsed + workflow['dataStats']['processingTime'] += (endTime - startTime) + + # Update in database + self.mydom.updateWorkflow(workflow["id"], { + "dataStats": workflow['dataStats'] + }) logger.debug(f"Agent '{agentName}' completed task. RESULT: {self.parseJson2text(agentResults)}") @@ -712,6 +820,38 @@ filesDelivered = {self.parseJson2text(matchingDocuments)} messageObject = self.messageAdd(workflow, messageObject) logger.debug(f"message_user = {self.parseJson2text(messageObject)}.") + + # Update statistics for user input + if role == "user": + # Calculate bytes sent + bytesSent = len(messageContent.encode('utf-8')) + for doc in additionalFiles: + if doc.get('data'): + bytesSent += len(doc['data'].encode('utf-8')) + for content in doc.get('contents', []): + if content.get('data'): + bytesSent += len(content['data'].encode('utf-8')) + + # Calculate tokens used (now using bytes) + tokensUsed = bytesSent + + # Update workflow statistics + if 'dataStats' not in workflow: + workflow['dataStats'] = { + 'bytesSent': 0, + 'bytesReceived': 0, + 'tokensUsed': 0, + 'processingTime': 0 + } + + workflow['dataStats']['bytesSent'] += bytesSent + workflow['dataStats']['tokensUsed'] += tokensUsed + + # Update in database + self.mydom.updateWorkflow(workflow["id"], { + "dataStats": workflow['dataStats'] + }) + return messageObject async def processFileIds(self, fileIds: List[int]) -> List[Dict[str, Any]]: @@ -954,6 +1094,7 @@ filesDelivered = {self.parseJson2text(matchingDocuments)} """ Adds a message to the workflow and updates lastActivity. Saves the message in the database and updates the workflow with references. + Also updates statistics for the message. Args: workflow: Workflow object @@ -991,6 +1132,35 @@ filesDelivered = {self.parseJson2text(matchingDocuments)} # Set status if not present if "status" not in message: message["status"] = "step" + + # Calculate statistics for the message + bytesSent = len(message.get("content", "").encode('utf-8')) + for doc in message.get("documents", []): + if doc.get("data"): + bytesSent += len(doc["data"].encode('utf-8')) + for content in doc.get("contents", []): + if content.get("data"): + bytesSent += len(content["data"].encode('utf-8')) + + # Calculate tokens used (now using bytes) + tokensUsed = bytesSent + + # Update workflow statistics + if "dataStats" not in workflow: + workflow["dataStats"] = { + "bytesSent": 0, + "bytesReceived": 0, + "tokensUsed": 0, + "processingTime": 0 + } + + # Update statistics based on message role + if message["role"] == "user": + workflow["dataStats"]["bytesSent"] += bytesSent + workflow["dataStats"]["tokensUsed"] += tokensUsed + else: # assistant messages + workflow["dataStats"]["bytesReceived"] += bytesSent + workflow["dataStats"]["tokensUsed"] += tokensUsed # Add message to workflow workflow["messages"].append(message) @@ -1008,15 +1178,39 @@ filesDelivered = {self.parseJson2text(matchingDocuments)} # Save to database - first the message itself self.mydom.createWorkflowMessage(message) - # Then save the workflow with updated references + # Then save the workflow with updated references and statistics workflowUpdate = { "lastActivity": currentTime, - "messageIds": workflow["messageIds"] # Update the messageIds field + "messageIds": workflow["messageIds"], + "dataStats": workflow["dataStats"] # Include updated statistics } self.mydom.updateWorkflow(workflow["id"], workflowUpdate) return message + def _trimDataInJson(self, jsonObj: Any) -> Any: + """ + Trims the data attribute in JSON objects while preserving other content. + + Args: + jsonObj: JSON object to process + + Returns: + Processed JSON object with trimmed data attribute + """ + if isinstance(jsonObj, dict): + # Create a copy to avoid modifying the original + result = jsonObj.copy() + if 'data' in result: + # Trim data attribute if it's a string + if isinstance(result['data'], str): + result['data'] = result['data'][:100] + '...' + # If it's a dict or list, convert to string and trim + else: + result['data'] = str(result['data'])[:100] + '...' + return result + return jsonObj + def logAdd(self, workflow: Dict[str, Any], message: str, level: str = "info", progress: Optional[int] = None) -> str: """ @@ -1045,11 +1239,24 @@ filesDelivered = {self.parseJson2text(matchingDocuments)} # Set agentName from global settings agentName = GLOBAL_WORKFLOW_LABELS.get("systemName", "unknown") + # Process message if it contains JSON + processedMessage = message + try: + if isinstance(message, str) and ("{" in message or "[" in message): + # Try to parse as JSON + jsonObj = json.loads(message) + # Trim data attribute if present + processedJson = self._trimDataInJson(jsonObj) + processedMessage = json.dumps(processedJson) + except json.JSONDecodeError: + # If parsing fails, use original message + pass + # Create log entry logEntry = { "id": logId, "workflowId": workflow["id"], - "message": message, + "message": processedMessage, "type": level, "timestamp": datetime.now().isoformat(), "agentName": agentName, @@ -1068,11 +1275,11 @@ filesDelivered = {self.parseJson2text(matchingDocuments)} # Also log in logger if level == "info": - logger.info(f"Workflow {workflow['id']}: {message}") + logger.info(f"Workflow {workflow['id']}: {processedMessage}") elif level == "warning": - logger.warning(f"Workflow {workflow['id']}: {message}") + logger.warning(f"Workflow {workflow['id']}: {processedMessage}") elif level == "error": - logger.error(f"Workflow {workflow['id']}: {message}") + logger.error(f"Workflow {workflow['id']}: {processedMessage}") return logId diff --git a/notes/changelog.txt b/notes/changelog.txt index 1aeb06a9..75cb1e2d 100644 --- a/notes/changelog.txt +++ b/notes/changelog.txt @@ -1,9 +1,14 @@ ....................... TASKS +agentDocumentation delivers a ".docx" file, but the content is a ".md" text markup file + +access management to extract into separate modules "lucydomAccess.py" and "gatewayAccess.py". Here to move the functions from "*Interface.py", which define what access which role has. + +check data extraction tabelle im pdf + Check data extraction of types! -final message with 100% to give @@ -30,7 +35,7 @@ PRIO3: Tools to transfer incl funds: - Google SERPAPI (shelly) - Anthropic Claude (valueon + shelly) -- +- Cursor Pro ----------------------- DONE