diff --git a/README_document_test.md b/README_document_test.md deleted file mode 100644 index 7bf052f2..00000000 --- a/README_document_test.md +++ /dev/null @@ -1,114 +0,0 @@ -# Document Extraction Test - -This test procedure validates the DocumentManager's ability to extract content from files using AI-powered analysis. - -## Files Created - -- `test_document_extraction.py` - Main test script -- `test_sample_document.txt` - Sample document for testing -- `run_document_test.ps1` - PowerShell wrapper script -- `test_document_extraction.log` - Generated log file (cleared on each run) - -## Usage - -### Method 1: Using PowerShell Script (Recommended) - -```powershell -# Test with default sample file -.\run_document_test.ps1 - -# Test with custom file -.\run_document_test.ps1 "path\to\your\document.pdf" -``` - -### Method 2: Direct Python Execution - -```bash -# Test with default sample file -python test_document_extraction.py test_sample_document.txt - -# Test with custom file -python test_document_extraction.py "path/to/your/document.docx" -``` - -## Test Features - -1. **File Validation**: Checks if the specified file exists -2. **MIME Type Detection**: Automatically detects file type based on extension -3. **Content Extraction**: Uses the DocumentManager to extract content -4. **AI Processing**: Applies the prompt "summarize the content and give list of the major topics" -5. **Comprehensive Logging**: Logs all steps and results to `test_document_extraction.log` -6. **Log Cleanup**: Clears the log file on each test run - -## Supported File Types - -- Text files (.txt, .md) -- CSV files (.csv) -- JSON files (.json) -- XML files (.xml) -- HTML files (.html, .htm) -- Images (.jpg, .jpeg, .png, .gif, .svg) -- PDF files (.pdf) -- Office documents (.docx, .xlsx, .pptx) -- And more (fallback to binary processing) - -## Test Output - -The test generates detailed logs including: - -- File information (path, size, MIME type) -- Extraction process details -- Extracted content summary -- AI-processed results -- Error details if any issues occur - -## Example Output - -``` -=== STARTING DOCUMENT EXTRACTION TEST === -File information: { - "file_path": "test_sample_document.txt", - "filename": "test_sample_document.txt", - "mime_type": "text/plain", - "file_size_bytes": 2048, - "file_size_mb": 0.0 -} -Document extraction completed successfully: { - "extracted_content_id": "test-doc-1234567890", - "content_items_count": 1, - "object_type": "ExtractedContent" -} -COMPLETE EXTRACTED CONTENT: { - "total_length": 1500, - "content": "PowerOn System Architecture Overview... [AI processed summary]" -} -``` - -## Error Handling - -The test includes comprehensive error handling for: - -- File not found errors -- File reading errors -- Document processing errors -- AI processing errors -- Import errors - -All errors are logged with detailed information for debugging. - -## Configuration - -The test uses the same configuration as other tests: - -- Environment variable: `POWERON_CONFIG_FILE = 'test_config.ini'` -- Log file: `test_document_extraction.log` -- Log level: DEBUG - -## Dependencies - -The test requires the same dependencies as the main PowerOn system: - -- Python 3.8+ -- Required Python packages (see requirements.txt) -- Access to AI services (if AI processing is enabled) -- Proper configuration in test_config.ini \ No newline at end of file diff --git a/modules/historic_data_agents/agentCoder.py b/modules/historic_data_agents/agentCoder.py deleted file mode 100644 index 8cb4d869..00000000 --- a/modules/historic_data_agents/agentCoder.py +++ /dev/null @@ -1,1039 +0,0 @@ -""" -Coder agent for generating and executing code. -Provides code generation, execution, and improvement capabilities. -""" - -import logging -from typing import Dict, Any, List, Tuple, Optional -import json -import os -import sys -import subprocess -import tempfile -import shutil -import venv -import importlib.util -from datetime import datetime -import uuid - -from modules.workflow.agentBase import AgentBase -from modules.shared.configuration import APP_CONFIG -from modules.interfaces.serviceChatModel import Task, ChatDocument, ChatContent -from modules.shared.attributeUtils import ModelMixin - -logger = logging.getLogger(__name__) - -class AgentCoder(AgentBase): - """Simplified Agent for developing and executing Python code with integrated executor""" - - def __init__(self): - """Initialize the coder agent""" - super().__init__() - self.name = "coder" - self.label = "Developer and Code Executor" - self.description = "Develops and executes Python code for data processing and automation" - self.capabilities = [ - "code_development", - "data_processing", - "file_processing", - "automation", - "code_execution" - ] - - # Executor settings - self.executorTimeout = int(APP_CONFIG.get("Agent_Coder_EXECUTION_TIMEOUT")) # seconds - self.executionRetryLimit = int(APP_CONFIG.get("Agent_Coder_EXECUTION_RETRY")) # max retries - self.tempDir = None - - def setDependencies(self, serviceBase=None): - """Set external dependencies for the agent.""" - self.setService(serviceBase) - - async def processTask(self, task: Task) -> Dict[str, Any]: - """ - Process a task and perform code development/execution. - First checks if the task can be completed without code execution, - then falls back to code generation if needed. - Enhanced to ensure all generated documents are included in output. - - Args: - task: Task object with prompt, inputDocuments, outputSpecifications - - Returns: - Dictionary with feedback and documents - """ - # 1. Extract task information - prompt = task.prompt - inputDocuments = task.filesInput - outputSpecs = task.filesOutput - - # Check if AI service is available - if not self.service or not self.service.base: - logger.error("No AI service configured for the Coder agent") - return { - "feedback": "The Coder agent is not properly configured.", - "documents": [] - } - - # 2. Extract data from documents in separate categories - documentData = [] # For raw file data (for code execution) - contentData = [] # For content data (later use) - contentExtraction = [] # For AI-extracted data (for quick completion) - - for doc in inputDocuments: - # Create proper filename from name and ext - filename = f"{doc.name}.{doc.ext}" if doc.ext else doc.name - - # Add main document data to documentData if it exists - docData = doc.data - if docData: - isBase64 = True # Assume base64 encoded for document data - documentData.append([filename, docData, isBase64]) - - # Process contents for different uses - if doc.contents: - for content in doc.contents: - contentName = content.name - - # For AI-extracted data (quick completion) - if content.data: - contentExtraction.append({ - "filename": filename, - "contentName": contentName, - "contentData": content.data, - "contentType": content.contentType, - "summary": content.summary - }) - - # For raw content data - if content.data: - rawData = content.data - isBase64 = content.metadata.get('base64Encoded', False) if content.metadata else False - contentData.append({ - "filename": filename, - "contentName": contentName, - "data": rawData, - "isBase64": isBase64, - "contentType": content.contentType - }) - - # Also add to documentData for code execution if not already added - if not docData or docData != rawData: - documentData.append([filename, rawData, isBase64]) - - # 3. Check if task can be completed without code execution - quickCompletion = await self._checkQuickCompletion(prompt, contentExtraction, outputSpecs) - - if quickCompletion and quickCompletion.get("complete") == 1: - logger.info("Task completed without code execution") - return { - "feedback": quickCompletion.get("prompt", "Task completed successfully."), - "documents": quickCompletion.get("documents", []) - } - else: - logger.debug(f"Code to generate, no quick check") - - # If quick completion not possible, continue with code generation and execution - logger.info("Generating code to solve the task") - - # 4. Generate code using AI - code, requirements = await self._generateCode(prompt, outputSpecs) - if not code: - return { - "feedback": "Failed to generate code for the task.", - "documents": [] - } - # Store the original code without document data - original_clean_code = code # Save clean code for later use in improvement - - # 5. Replace the placeholder with actual inputFiles data - documentDataJson = repr(documentData) - codeWithData = code.replace("inputFiles = \"=== JSONLOAD ===\"", f"inputFiles = {documentDataJson}") - - # 6. Execute code with retry logic - retryCount = 0 - maxRetries = self.executionRetryLimit - executionHistory = [] - - while retryCount <= maxRetries: - executionResult = self._executeCode(codeWithData, requirements) - executionHistory.append({ - "attempt": retryCount + 1, - "code": codeWithData, - "result": executionResult - }) - - # Check if execution was successful - if executionResult.get("success", False): - logger.info(f"Code execution succeeded on attempt {retryCount + 1}") - break - - # If we've reached max retries, exit the loop - if retryCount >= maxRetries: - logger.info(f"Reached maximum retry limit ({maxRetries}). Giving up.") - break - - # Log the error and attempt to improve the code - error = executionResult.get("error", "Unknown error") - logger.info(f"Execution attempt {retryCount + 1} failed: {error}. Attempting to improve code.") - - # Generate improved code based on error - improvedCode, improvedRequirements = await self._improveCode( - originalCode=original_clean_code, # Use clean code without document data - error=error, - executionResult=executionResult, - attempt=retryCount + 1, - outputSpecs=outputSpecs - ) - - if improvedCode: - # Inject document data into improved code - original_clean_code = improvedCode # Update clean code for next potential improvement - codeWithData = improvedCode.replace("inputFiles = \"=== JSONLOAD ===\"", f"inputFiles = {documentDataJson}") - requirements = improvedRequirements - logger.info(f"Code improved for retry {retryCount + 2}") - else: - logger.warning("Failed to improve code, using original code for retry") - - retryCount += 1 - - # 7. Process results and create output documents - documents = [] - - # Always add the final code document - documents.append(self.formatAgentDocumentOutput("generated_code.py", codeWithData, "text/plain")) - - # Add execution history document - executionHistoryStr = json.dumps(executionHistory, indent=2) - documents.append(self.formatAgentDocumentOutput("execution_history.json", executionHistoryStr, "application/json")) - - # Enhanced result handling: Create documents based on execution results - fixed for proper content extraction - if executionResult.get("success", False): - resultData = executionResult.get("result") - - # Process results from the result dictionary if available - if isinstance(resultData, dict): - # First, create a mapping of expected output labels to their specs - expectedOutputs = {spec.get("label"): spec for spec in outputSpecs} - createdOutputs = set() - - for label, result_item in resultData.items(): - # Check if result follows the expected structure with nested content - if isinstance(result_item, dict) and "content" in result_item: - # Extract values from the properly structured result - content = result_item.get("content", "") # Extract the inner content - base64Encoded = result_item.get("base64Encoded", False) - contentType = result_item.get("contentType", "text/plain") - - # Check if this label matches one of our expected output documents - # If not, but we haven't created all expected outputs yet, try to map it - finalLabel = label - if label not in expectedOutputs and len(expectedOutputs) > 0: - # Find an unused expected output label - for expectedLabel in expectedOutputs: - if expectedLabel not in createdOutputs: - logger.warning(f"Remapping output '{label}' to expected '{expectedLabel}'") - finalLabel = expectedLabel - break - - # Create document by passing only the content to formatAgentDocumentOutput - doc = self.formatAgentDocumentOutput(finalLabel, content, contentType) - - # Override the base64Encoded flag with the value from the result - # This is needed since formatAgentDocumentOutput might determine a different value - if isinstance(base64Encoded, bool): - doc.base64Encoded = base64Encoded - - documents.append(doc) - createdOutputs.add(finalLabel) - logger.info(f"Created document from result: {finalLabel} ({contentType}, base64={base64Encoded})") - else: - # Not properly structured - log warning - logger.warning(f"Skipping improperly formatted result for '{label}'. Results must include 'content' field.") - else: - # Handle non-dictionary results - logger.warning("Execution result is not a dictionary. Creating a single output document.") - doc = self.formatAgentDocumentOutput("result.txt", str(resultData), "text/plain") - documents.append(doc) - - # 8. Return results - return { - "feedback": "Code execution completed successfully." if executionResult.get("success", False) else f"Code execution failed: {executionResult.get('error', 'Unknown error')}", - "documents": documents - } - - async def _improveCode(self, originalCode: str, error: str, executionResult: Dict[str, Any], attempt: int, outputSpecs: List[Dict[str, Any]] = None) -> Tuple[str, List[str]]: - """ - Improve code based on execution error. - Enhanced to maintain proper output handling with correct document structure. - - Args: - originalCode: The code that failed to execute - error: The error message - executionResult: Complete execution result dictionary - attempt: Current attempt number - outputSpecs: List of expected output specifications - - Returns: - Tuple of (improvedCode, requirements) - """ - # Create a string with output specifications to be included in the prompt - outputSpecsStr = "" - if outputSpecs: - outputSpecsStr = "\nEXPECTED OUTPUT DOCUMENTS:\n" - for i, spec in enumerate(outputSpecs, 1): - label = spec.get("label", f"output{i}.txt") - description = spec.get("description", "") - outputSpecsStr += f"{i}. {label} - {description}\n" - - # Create prompt for code improvement - improvementPrompt = f""" -Fix the following Python code that failed during execution. This is attempt {attempt} to fix the code. - -ORIGINAL CODE: -{originalCode} - -ERROR MESSAGE: -{error} - -STDOUT: -{executionResult.get('output', '')} -{outputSpecsStr} -INSTRUCTIONS: -1. Fix all errors identified in the error message -2. If there is a requirements error for missing or failes modules, then create alternate code with other modules -3. Diagnose and fix any logical issues -4. Pay special attention to: -- Type conversions and data handling -- Error handling and edge cases -- Resource management (file handles, etc.) -- Syntax errors and typos -5. Keep the inputFiles handling logic intact -6. Maintain the same overall structure and purpose - -OUTPUT REQUIREMENTS (VERY IMPORTANT): -- Your code MUST define a 'result' variable as a dictionary to store ALL outputs -- The key for each entry MUST be the full filename with extension (e.g., "output.txt") -- The value for each entry MUST be a dictionary with the following structure: -{{ - "content": string, # The actual content (text or base64-encoded string) - "base64Encoded": boolean, # Set to true for binary data, false for text data - "contentType": string # MIME type of the content (e.g., "text/plain", "application/json") -}} -- Example result dictionary: -result = {{ - "output.txt": {{ - "content": "This is text content", - "base64Encoded": False, - "contentType": "text/plain" - }}, - "chart.png": {{ - "content": "base64encodedstring...", - "base64Encoded": True, - "contentType": "image/png" - }} -}} -- NEVER write files to disk using open() or similar methods - use the result dictionary instead - -JSON OUTPUT (CRITICAL): -- After creating the result dictionary, you MUST print it as JSON to stdout -- Make sure your code includes: print(json.dumps(result)) as the final line -- This printed JSON is how the system captures your result - -REQUIREMENTS: -Required packages should be specified as: -# REQUIREMENTS: library==version,library2>=version -- You may add/remove requirements as needed to fix the code - -Return ONLY Python code without explanations or markdown. -""" - - # Call AI service - messages = [ - {"role": "system", "content": "You are an expert Python code debugger. Provide only fixed Python code without explanations or formatting. Ensure all generated files are included in the 'result' dictionary and that result is printed as JSON with print(json.dumps(result))."}, - {"role": "user", "content": improvementPrompt} - ] - - try: - improvedContent = await self.service.base.callAi(messages, temperature=0.2) - - # Extract code and requirements - improvedCode = self._cleanCode(improvedContent) - - # Extract requirements - requirements = [] - for line in improvedCode.split('\n'): - if line.strip().startswith("# REQUIREMENTS:"): - reqStr = line.replace("# REQUIREMENTS:", "").strip() - requirements = [r.strip() for r in reqStr.split(',') if r.strip()] - break - - return improvedCode, requirements - except Exception as e: - logger.error(f"Error improving code: {str(e)}") - return None, [] - - - async def _checkQuickCompletion(self, prompt: str, contentExtraction: List[ChatDocument], outputSpecs: List[Dict[str, Any]]) -> Dict[str, Any]: - """ - Check if the task can be completed without writing and executing code. - - Args: - prompt: The task prompt - contentExtraction: List of extracted content data with contentName and dataExtracted - outputSpecs: List of output specifications - - Returns: - Dictionary with completion status and results, or None if no quick completion - """ - # If no data or no output specs, can't do a quick completion - if not contentExtraction or not outputSpecs: - return None - - # Create a prompt for the AI to check if this can be completed directly - specsJson = json.dumps(outputSpecs) - dataJson = json.dumps([doc.dict() for doc in contentExtraction]) - - checkPrompt = f""" -Analyze this task and determine if it can be completed directly without writing code. - -TASK: -{prompt} - -EXTRACTED DATA AVAILABLE: -{dataJson} - -Each entry in the extracted data contains: -- filename: The source file name -- contentName: The specific content section name -- contentData: The AI-extracted text from the content -- contentType: The type of content (text, csv, etc.) -- summary: A brief summary of the content - -REQUIRED OUTPUT: -{specsJson} - -If the task can be completed directly with the available extracted data, respond with: -{{"complete": 1, "prompt": "Brief explanation of the solution", "documents": [ - {{"label": "filename.ext", "content": "content here"}} -]}} - -If code would be needed to properly complete this task, respond with: -{{"complete": 0, "prompt": "Explanation why code is needed"}} - -Only return valid JSON. Your entire response must be parseable as JSON. -""" - - # Call AI service - logger.debug(f"Checking if task can be completed without code execution: {checkPrompt}") - messages = [ - {"role": "system", "content": "You are an AI assistant that determines if tasks require code execution. Reply with JSON only."}, - {"role": "user", "content": checkPrompt} - ] - - try: - # Use a lower temperature for more deterministic response - response = await self.service.base.callAi(messages, produceUserAnswer = True, temperature=0.1) - - # Parse response as JSON - if response: - try: - # Find JSON in response if there's any text around it - jsonStart = response.find('{') - jsonEnd = response.rfind('}') + 1 - - if jsonStart >= 0 and jsonEnd > jsonStart: - jsonStr = response[jsonStart:jsonEnd] - result = json.loads(jsonStr) - - # Check if this is a proper response - if "complete" in result: - return result - - except json.JSONDecodeError: - logger.debug("Failed to parse quick completion response as JSON") - pass - except Exception as e: - logger.debug(f"Error during quick completion check: {str(e)}") - - # Default to requiring code execution - return None - - async def _generateCode(self, prompt: str, outputSpecs: List[ChatDocument] = None) -> Tuple[str, List[str]]: - """ - Generate Python code from a prompt with the inputFiles placeholder. - Enhanced to emphasize proper result output handling with correct document structure. - - Args: - prompt: The task prompt - outputSpecs: List of expected output specifications - - Returns: - Tuple of (code, requirements) - """ - # Create a string with output specifications to be included in the prompt - outputSpecsStr = "" - if outputSpecs: - outputSpecsStr = "\nEXPECTED OUTPUT DOCUMENTS:\n" - for i, spec in enumerate(outputSpecs, 1): - label = spec.get("label", f"output{i}.txt") - description = spec.get("description", "") - outputSpecsStr += f"{i}. {label} - {description}\n" - - # Create improved prompt for code generation - aiPrompt = f""" -Generate Python code to solve the following task: - -TASK: -{prompt} -{outputSpecsStr} -INPUT FILES: -- 'inputFiles' variable is provided as [[filename, data, isBase64], ...] -- For text files (isBase64=False): use data directly as string -- For binary files (isBase64=True): use base64.b64decode(data) - -OUTPUT REQUIREMENTS (VERY IMPORTANT): -- Your code MUST define a 'result' variable as a dictionary to store ALL outputs -- The key for each entry MUST be the full filename with extension (e.g., "output.txt") -- The value for each entry MUST be a dictionary with the following structure: -{{ - "content": string, # The actual content (text or base64-encoded string) - "base64Encoded": boolean, # Set to true for binary data, false for text data - "contentType": string # MIME type of the content (e.g., "text/plain", "application/json") -}} -- Example result dictionary: -result = {{ - "output.txt": {{ - "content": "This is text content", - "base64Encoded": False, - "contentType": "text/plain" - }}, - "chart.png": {{ - "content": "base64encodedstring...", - "base64Encoded": True, - "contentType": "image/png" - }} -}} -- NEVER write files to disk using open() or similar methods - use the result dictionary instead -- If you generate any charts, reports, or visualizations, ensure they are properly encoded and included - -IMPORTANT - USE EXACT OUTPUT FILENAMES: -- You MUST use the EXACT filenames specified in EXPECTED OUTPUT DOCUMENTS section -- The key in the result dictionary must match these filenames precisely -- If no output documents are specified, use appropriate descriptive filenames - -JSON OUTPUT (CRITICAL): -- After creating the result dictionary, you MUST print it as JSON to stdout using json.dumps() -- Add these lines at the end of your code: - import json # if not already imported - print(json.dumps(result)) -- This printed JSON is how the system captures your result -- Make sure this is the last thing your code prints - -BINARY DATA HANDLING: -- For binary content (images, PDFs, etc.), convert to base64 string and set base64Encoded=True -- For text content (text, JSON, HTML, etc.), use plain string and set base64Encoded=False -- Use appropriate MIME types for different content types - -CODE QUALITY: -- Use explicit type conversions where needed (int/float/str) -- Implement feature detection, not version checks -- Handle errors gracefully with appropriate fallbacks -- Follow latest API conventions for libraries -- Validate inputs before processing - -Your code must start with: -inputFiles = "=== JSONLOAD ===" # DO NOT CHANGE THIS LINE - -REQUIREMENTS: -Required packages should be specified as: -# REQUIREMENTS: library==version,library2>=version -- Specify exact versions for critical libraries -- Use constraint operators (==,>=,<=) as needed - -Return ONLY Python code without explanations or markdown. -""" - - # Call AI service - messages = [ - {"role": "system", "content": "You are a Python code generator. Provide only valid Python code without explanations or formatting. Always output the result dictionary as JSON using print(json.dumps(result)) at the end of your code."}, - {"role": "user", "content": aiPrompt} - ] - - generatedContent = await self.service.base.callAi(messages, temperature=0.1) - - # Extract code and requirements - code = self._cleanCode(generatedContent) - - # Extract requirements - requirements = [] - for line in code.split('\n'): - if line.strip().startswith("# REQUIREMENTS:"): - reqStr = line.replace("# REQUIREMENTS:", "").strip() - requirements = [r.strip() for r in reqStr.split(',') if r.strip()] - break - - return code, requirements - - def _executeCodeProd(self, code: str, requirements: List[str] = None) -> Dict[str, Any]: - """ - Execute Python code in Azure environment using the antenv interpreter. - Optimized for production use in Azure Web App environment where venv creation fails. - - Args: - code: Python code to execute - requirements: List of required packages - - Returns: - Execution result dictionary - """ - try: - # 1. Create temp directory for code files - self.tempDir = tempfile.mkdtemp(prefix="code_exec_") - - # Try different possible paths to find the antenv Python interpreter - possible_python_paths = [ - "/home/site/wwwroot/antenv/bin/python", - "/antenv/bin/python", - "/tmp/8dd8c226509f116/antenv/bin/python", # Path from your error logs - sys.executable # Fallback to system Python - ] - - pythonExe = None - for path in possible_python_paths: - if os.path.exists(path): - pythonExe = path - logger.info(f"Found Python interpreter at: {pythonExe}") - break - - if not pythonExe: - logger.error("Could not find a valid Python interpreter in Azure environment") - return { - "success": False, - "output": "", - "error": "Could not find a valid Python interpreter in Azure environment", - "result": None, - "exitCode": -1 - } - - # 2. Install requirements to a temporary user directory if provided - if requirements: - logger.info(f"Installing requirements in Azure environment: {requirements}") - - # Create requirements.txt - reqFile = os.path.join(self.tempDir, "requirements.txt") - with open(reqFile, "w") as f: - f.write("\n".join(requirements)) - - # Set up a custom PYTHONUSERBASE to isolate package installations - custom_user_base = os.path.join(self.tempDir, "pip_packages") - os.makedirs(custom_user_base, exist_ok=True) - - env = os.environ.copy() - env["PYTHONUSERBASE"] = custom_user_base - - # Install requirements to the custom user directory - try: - pipResult = subprocess.run( - [pythonExe, "-m", "pip", "install", "--user", "-r", reqFile], - capture_output=True, - text=True, - env=env, - timeout=int(APP_CONFIG.get("Agent_Coder_INSTALL_TIMEOUT")) - ) - - if pipResult.returncode != 0: - logger.warning(f"Error installing requirements in Azure: {pipResult.stderr}") - else: - logger.info(f"Requirements installed successfully to {custom_user_base}") - - # Try to find the site-packages directory - import glob - site_packages = os.path.join(custom_user_base, "lib", "python*", "site-packages") - site_packages_paths = glob.glob(site_packages) - - if site_packages_paths: - env["PYTHONPATH"] = os.pathsep.join([site_packages_paths[0], env.get("PYTHONPATH", "")]) - logger.info(f"Added {site_packages_paths[0]} to PYTHONPATH") - else: - # Alternative paths for different Python versions - alt_site_packages = os.path.join(custom_user_base, "site-packages") - if os.path.exists(alt_site_packages): - env["PYTHONPATH"] = os.pathsep.join([alt_site_packages, env.get("PYTHONPATH", "")]) - logger.info(f"Added {alt_site_packages} to PYTHONPATH") - except Exception as e: - logger.warning(f"Exception during requirements installation in Azure: {str(e)}") - else: - env = os.environ.copy() - - # 3. Write code to file - codeFile = os.path.join(self.tempDir, "code.py") - with open(codeFile, "w", encoding="utf-8") as f: - f.write(code) - - # 4. Execute code with the modified environment - logger.debug(f"Executing code in Azure environment with timeout of {self.executorTimeout} seconds") - process = subprocess.run( - [pythonExe, codeFile], - timeout=self.executorTimeout, - capture_output=True, - text=True, - env=env - ) - - # 5. Process results - stdout = process.stdout - stderr = process.stderr - - # Try to extract result from stdout - resultData = None - if process.returncode == 0: - try: - # Find the last line that might be JSON - jsonLines = [] - for line in stdout.strip().split('\n'): - line = line.strip() - if line and line[0] in '{[' and line[-1] in '}]': - try: - parsed = json.loads(line) - jsonLines.append((line, parsed)) - except json.JSONDecodeError: - continue - - # Use the last valid JSON that appears to be a dictionary - if jsonLines: - for line, parsed in reversed(jsonLines): - if isinstance(parsed, dict): - resultData = parsed - logger.debug(f"Extracted result data from stdout: {type(resultData)}") - break - except Exception as e: - logger.debug(f"Error extracting result from stdout: {str(e)}") - - # Enhanced logging of what was found - if resultData: - logger.info(f"Found result dictionary with {len(resultData)} entries: {list(resultData.keys())}") - else: - logger.warning("No result dictionary found in output") - - # Create result dictionary - return { - "success": process.returncode == 0, - "output": stdout, - "error": stderr if process.returncode != 0 else "", - "result": resultData, - "exitCode": process.returncode - } - - except subprocess.TimeoutExpired: - logger.error(f"Execution in Azure timed out after {self.executorTimeout} seconds") - return { - "success": False, - "output": "", - "error": f"Execution timed out after {self.executorTimeout} seconds", - "result": None, - "exitCode": -1 - } - except Exception as e: - logger.error(f"Execution error in Azure environment: {str(e)}") - return { - "success": False, - "output": "", - "error": f"Execution error in Azure environment: {str(e)}", - "result": None, - "exitCode": -1 - } - finally: - # Clean up resources - self._cleanupExecution() - - def _executeCodeVenv(self, code: str, requirements: List[str] = None) -> Dict[str, Any]: - """ - Execute Python code in a virtual environment. - Original implementation with venv creation for non-Azure environments. - - Args: - code: Python code to execute - requirements: List of required packages - - Returns: - Execution result dictionary - """ - try: - # 1. Create temp directory and virtual environment - self.tempDir = tempfile.mkdtemp(prefix="code_exec_") - venvPath = os.path.join(self.tempDir, "venv") - - # Create venv - logger.debug(f"Creating virtual environment at {venvPath}") - - try: - # First try with sys.executable - the standard approach - subprocess.run([sys.executable, "-m", "venv", venvPath], - check=True, capture_output=True, timeout=60) - logger.debug("Virtual environment created successfully with sys.executable") - except (subprocess.SubprocessError, subprocess.CalledProcessError) as e: - logger.warning(f"Failed to create venv with sys.executable: {str(e)}") - - # Fallback method 1: Try with explicit 'python3' command - try: - logger.debug("Trying to create virtual environment with python3 command") - subprocess.run(["python3", "-m", "venv", venvPath], - check=True, capture_output=True, timeout=60) - logger.debug("Virtual environment created successfully with python3") - except (subprocess.SubprocessError, subprocess.CalledProcessError) as e: - logger.warning(f"Failed to create venv with python3: {str(e)}") - - # Fallback method 2: Try with virtualenv instead of venv - try: - logger.debug("Trying to create virtual environment with virtualenv module") - subprocess.run([sys.executable, "-m", "pip", "install", "virtualenv"], - check=False, capture_output=True, timeout=60) - subprocess.run([sys.executable, "-m", "virtualenv", venvPath], - check=True, capture_output=True, timeout=60) - logger.debug("Virtual environment created successfully with virtualenv") - except (subprocess.SubprocessError, subprocess.CalledProcessError) as e: - # If all methods fail, raise an exception - error_msg = f"Failed to create virtual environment with all methods: {str(e)}" - logger.error(error_msg) - raise RuntimeError(error_msg) - - # Get Python executable path - adjusted for OS - if os.name == 'nt': # Windows - pythonExe = os.path.join(venvPath, "Scripts", "python.exe") - else: # Linux/Mac - pythonExe = os.path.join(venvPath, "bin", "python") - - # Verify python executable exists - if not os.path.exists(pythonExe): - # Try to find it - if os.name == 'nt': - possible_paths = [ - os.path.join(venvPath, "Scripts", "python.exe"), - os.path.join(venvPath, "Scripts", "python") - ] - else: - possible_paths = [ - os.path.join(venvPath, "bin", "python"), - os.path.join(venvPath, "bin", "python3") - ] - - for path in possible_paths: - if os.path.exists(path): - pythonExe = path - logger.debug(f"Found Python executable at: {pythonExe}") - break - - if not os.path.exists(pythonExe): - logger.error(f"Python executable not found at expected path: {pythonExe}") - raise FileNotFoundError(f"Python executable not found in virtual environment") - - # 2. Install requirements if provided - if requirements: - logger.info(f"Installing requirements: {requirements}") - - # Create requirements.txt - reqFile = os.path.join(self.tempDir, "requirements.txt") - with open(reqFile, "w") as f: - f.write("\n".join(requirements)) - - x="\n".join(requirements) - logger.info(f"Requirements file: {x}.") - - # Install requirements - try: - pipResult = subprocess.run( - [pythonExe, "-m", "pip", "install", "-r", reqFile], - capture_output=True, - text=True, - timeout=int(APP_CONFIG.get("Agent_Coder_INSTALL_TIMEOUT")) - ) - if pipResult.returncode != 0: - logger.debug(f"Error installing requirements: {pipResult.stderr}") - else: - logger.debug(f"Requirements installed successfully") - # Log installed packages if in debug mode - if logger.isEnabledFor(logging.DEBUG): - pipList = subprocess.run( - [pythonExe, "-m", "pip", "list"], - capture_output=True, - text=True - ) - logger.debug(f"Installed packages:\n{pipList.stdout}") - - except Exception as e: - logger.debug(f"Exception during requirements installation: {str(e)}") - - # 3. Write code to file - codeFile = os.path.join(self.tempDir, "code.py") - with open(codeFile, "w", encoding="utf-8") as f: - f.write(code) - - # 4. Execute code - logger.debug(f"Executing code with timeout of {self.executorTimeout} seconds. Code: {code}") - process = subprocess.run( - [pythonExe, codeFile], - timeout=self.executorTimeout, - capture_output=True, - text=True - ) - - # 5. Process results - stdout = process.stdout - stderr = process.stderr - - # Try to extract result from stdout - resultData = None - if process.returncode == 0: - try: - # Find the last line that might be JSON - jsonLines = [] - for line in stdout.strip().split('\n'): - line = line.strip() - if line and line[0] in '{[' and line[-1] in '}]': - try: - parsed = json.loads(line) - jsonLines.append((line, parsed)) - except json.JSONDecodeError: - continue - - # Use the last valid JSON that appears to be a dictionary - if jsonLines: - for line, parsed in reversed(jsonLines): - if isinstance(parsed, dict): - resultData = parsed - logger.debug(f"Extracted result data from stdout: {type(resultData)}") - break - except Exception as e: - logger.debug(f"Error extracting result from stdout: {str(e)}") - - # Enhanced logging of what was found - if resultData: - logger.info(f"Found result dictionary with {len(resultData)} entries: {list(resultData.keys())}") - else: - logger.warning("No result dictionary found in output") - - # Create result dictionary - return { - "success": process.returncode == 0, - "output": stdout, - "error": stderr if process.returncode != 0 else "", - "result": resultData, - "exitCode": process.returncode - } - - except subprocess.TimeoutExpired: - logger.error(f"Execution timed out after {self.executorTimeout} seconds") - return { - "success": False, - "output": "", - "error": f"Execution timed out after {self.executorTimeout} seconds", - "result": None, - "exitCode": -1 - } - except Exception as e: - logger.error(f"Execution error: {str(e)}") - return { - "success": False, - "output": "", - "error": f"Execution error: {str(e)}", - "result": None, - "exitCode": -1 - } - finally: - # Clean up resources - self._cleanupExecution() - - def _executeCode(self, code: str, requirements: List[str] = None) -> Dict[str, Any]: - """ - Execute Python code in the appropriate environment based on configuration. - - Args: - code: Python code to execute - requirements: List of required packages - - Returns: - Execution result dictionary - """ - # Check if we're in a production Azure environment - env_type = APP_CONFIG.get("APP_ENV_TYPE", "dev").lower() - - logger.info(f"Executing code in environment type: {env_type}") - - if env_type == "prod": - # Use the Azure-optimized execution method - logger.info("Using Azure-optimized code execution method") - return self._executeCodeProd(code, requirements) - else: - # Use the standard virtual environment execution method - logger.info("Using standard virtual environment execution method") - return self._executeCodeVenv(code, requirements) - - - def _cleanupExecution(self): - """Clean up temporary resources from code execution.""" - if self.tempDir and os.path.exists(self.tempDir): - try: - logger.debug(f"Cleaning up temporary directory: {self.tempDir}") - shutil.rmtree(self.tempDir) - self.tempDir = None - except Exception as e: - logger.warning(f"Error cleaning up temp directory: {str(e)}") - - def _cleanCode(self, code: str) -> str: - """Remove any markdown formatting or explanations.""" - # Remove code block markers - code = code.replace("```python", "").replace("```", "") - - # Remove explanations before or after code - lines = code.strip().split('\n') - startIndex = 0 - endIndex = len(lines) - - # Find start of actual code - for i, line in enumerate(lines): - if line.strip().startswith("inputFiles =") or line.strip().startswith("# REQUIREMENTS:"): - startIndex = i - break - - # Clean code - cleanedCode = '\n'.join(lines[startIndex:endIndex]) - return cleanedCode.strip() - - def formatAgentDocumentOutput(self, filename: str, content: str, contentType: str) -> ChatDocument: - """ - Format a document for agent output. - - Args: - filename: Output filename - content: Document content - contentType: MIME type of the content - - Returns: - ChatDocument object - """ - # Split filename into name and extension - name, ext = os.path.splitext(filename) - if ext.startswith('.'): - ext = ext[1:] - - # Create document object - return ChatDocument( - id=str(uuid.uuid4()), - name=name, - ext=ext, - data=content, - contents=[ - ChatContent( - name="main", - data=content, - summary=f"Generated {filename}", - metadata={"contentType": contentType} - ) - ] - ) - -# Factory function for the Coder agent -def getAgentCoder(): - """Returns an instance of the Coder agent.""" - return AgentCoder() \ No newline at end of file diff --git a/modules/historic_data_agents/agentDocumentation.py b/modules/historic_data_agents/agentDocumentation.py deleted file mode 100644 index 1cf3e3b2..00000000 --- a/modules/historic_data_agents/agentDocumentation.py +++ /dev/null @@ -1,537 +0,0 @@ -""" -Documentation agent for generating structured documentation. -Provides comprehensive documentation generation capabilities. -""" - -import logging -from typing import Dict, Any, List, Optional -import json -import re -from datetime import datetime -import os -import hashlib -import base64 -import uuid -import shutil -from pathlib import Path -import traceback -import sys -import importlib.util -import inspect -from pydantic import BaseModel - -from modules.workflow.agentBase import AgentBase -from modules.interfaces.serviceChatModel import ChatContent - -logger = logging.getLogger(__name__) - -class AgentDocumentation(AgentBase): - """AI-driven agent for creating documentation and structured content using multi-step generation""" - - def __init__(self): - """Initialize the documentation agent""" - super().__init__() - self.name = "documentation" - self.label = "Documentation" - self.description = "Creates structured documentation, reports, and content using AI with multi-step generation" - self.capabilities = [ - "report_generation", - "documentation", - "content_structuring", - "technical_writing", - "knowledge_organization" - ] - - def setDependencies(self, serviceBase=None): - """Set external dependencies for the agent.""" - self.setService(serviceBase) - - async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: - """ - Process a task by focusing on required outputs and using AI to generate them. - - Args: - task: Task dictionary with prompt, inputDocuments, outputSpecifications - - Returns: - Dictionary with feedback and documents - """ - try: - # Extract task information - prompt = task.get("prompt", "") - inputDocuments = task.get("inputDocuments", []) - outputSpecs = task.get("outputSpecifications", []) - - # Check AI service - if not self.service or not self.service.base: - return { - "feedback": "The Documentation agent requires an AI service to function.", - "documents": [] - } - - # Extract context from input documents - focusing only on dataExtracted - documentContext = self._extractDocumentContext(inputDocuments) - - # Create task analysis to understand the requirements - documentationPlan = await self._analyzeTask(prompt, documentContext, outputSpecs) - logger.debug(f"Documentation plan: {documentationPlan}") - - # Generate all required output documents - documents = [] - - # If no output specs provided, create default document - if not outputSpecs: - defaultFormat = documentationPlan.get("recommendedFormat", "markdown") - defaultTitle = documentationPlan.get("title", "Documentation") - safeTitle = self._sanitizeFilename(defaultTitle) - - outputSpecs = [ - {"label": f"{safeTitle}.{defaultFormat}", "description": "Comprehensive documentation"} - ] - - # Process each output specification - for spec in outputSpecs: - outputLabel = spec.get("label", "") - outputDescription = spec.get("description", "") - - # Generate the document using multi-step approach - document = await self._createDocumentMultiStep( - prompt, - documentContext, - outputLabel, - outputDescription, - documentationPlan - ) - - documents.append(document) - - # Generate feedback - feedback = documentationPlan.get("feedback", f"Created {len(documents)} documents based on your requirements.") - - return { - "feedback": feedback, - "documents": documents - } - - except Exception as e: - logger.error(f"Error in documentation generation: {str(e)}", exc_info=True) - return { - "feedback": f"Error during documentation generation: {str(e)}", - "documents": [] - } - - def _extractDocumentContext(self, documents: List[Dict[str, Any]]) -> str: - """ - Extract context from input documents, focusing on dataExtracted. - - Args: - documents: List of document objects - - Returns: - Extracted context as text - """ - contextParts = [] - - for doc in documents: - docName = doc.get("name", "unnamed") - if doc.get("ext"): - docName = f"{docName}.{doc.get('ext')}" - - contextParts.append(f"\n\n--- {docName} ---\n") - - # Process contents for dataExtracted - for content in doc.get("contents", []): - if content.get("dataExtracted"): - contextParts.append(content.get("dataExtracted", "")) - - return "\n".join(contextParts) - - def _sanitizeFilename(self, filename: str) -> str: - """ - Sanitize a filename by removing invalid characters. - - Args: - filename: Filename to sanitize - - Returns: - Sanitized filename - """ - # Replace invalid characters with underscores - invalidChars = r'<>:"/\|?*' - for char in invalidChars: - filename = filename.replace(char, '_') - - # Trim filename if too long - if len(filename) > 100: - filename = filename[:97] + "..." - - return filename - - async def _analyzeTask(self, prompt: str, context: str, outputSpecs: List) -> Dict: - """ - Use AI to analyze the task and create a documentation plan. - - Args: - prompt: The task prompt - context: Document context - outputSpecs: Output specifications - - Returns: - Documentation plan dictionary - """ - analysisPrompt = f""" - Analyze this documentation task and create a detailed plan. - - TASK: {prompt} - - DOCUMENT CONTEXT SAMPLE: - {context[:1000]}... (truncated) - - OUTPUT REQUIREMENTS: - {json.dumps(outputSpecs, indent=2)} - - Create a detailed documentation plan in JSON format with the following structure: - {{ - "title": "Document Title", - "documentType": "report|manual|guide|whitepaper|etc", - "audience": "technical|general|executive|etc", - "detailedStructure": [ - {{ - "title": "Chapter/Section Title", - "keyPoints": ["point1", "point2", ...], - "subsections": ["subsection1", "subsection2", ...], - "importance": "high|medium|low", - "estimatedLength": "short|medium|long" - }}, - ... more sections ... - ], - "keyTopics": ["topic1", "topic2", ...], - "tone": "formal|conversational|instructional|etc", - "recommendedFormat": "markdown|html|text|etc", - "formattingRequirements": ["requirement1", "requirement2", ...], - "executiveSummary": "Brief description of what the document will cover", - "feedback": "Brief message explaining the documentation approach" - }} - - Only return valid JSON. No preamble or explanations. - """ - - try: - response = await self.service.base.callAi([ - {"role": "system", "content": "You are a documentation expert. Respond with valid JSON only."}, - {"role": "user", "content": analysisPrompt} - ]) - - # Extract JSON from response - jsonStart = response.find('{') - jsonEnd = response.rfind('}') + 1 - - if jsonStart >= 0 and jsonEnd > jsonStart: - plan = json.loads(response[jsonStart:jsonEnd]) - return plan - else: - # Fallback if JSON not found - return { - "title": "Documentation (DEFAULT)", - "documentType": "report", - "audience": "general", - "detailedStructure": [ - { - "title": "Introduction", - "keyPoints": ["Purpose", "Scope"], - "subsections": [], - "importance": "high", - "estimatedLength": "short" - }, - { - "title": "Main Content", - "keyPoints": ["Core Information"], - "subsections": ["Key Findings", "Analysis"], - "importance": "high", - "estimatedLength": "long" - }, - { - "title": "Conclusion", - "keyPoints": ["Summary", "Next Steps"], - "subsections": [], - "importance": "medium", - "estimatedLength": "short" - } - ], - "keyTopics": ["General Information"], - "tone": "formal", - "recommendedFormat": "markdown", - "formattingRequirements": ["Clear headings", "Professional formatting"], - "executiveSummary": "A comprehensive documentation covering the requested topics.", - "feedback": "Created documentation based on your requirements." - } - - except Exception as e: - logger.warning(f"Error creating documentation plan: {str(e)}") - return { - "title": "Documentation", - "documentType": "report", - "audience": "general", - "detailedStructure": [ - { - "title": "Introduction", - "keyPoints": ["Purpose", "Scope"], - "subsections": [], - "importance": "high", - "estimatedLength": "short" - }, - { - "title": "Main Content", - "keyPoints": ["Core Information"], - "subsections": ["Key Findings", "Analysis"], - "importance": "high", - "estimatedLength": "long" - }, - { - "title": "Conclusion", - "keyPoints": ["Summary", "Next Steps"], - "subsections": [], - "importance": "medium", - "estimatedLength": "short" - } - ], - "keyTopics": ["General Information"], - "tone": "formal", - "recommendedFormat": "markdown", - "formattingRequirements": ["Clear headings", "Professional formatting"], - "executiveSummary": "A comprehensive documentation covering the requested topics.", - "feedback": "Created documentation based on your requirements." - } - - async def _createDocumentMultiStep(self, prompt: str, context: str, outputLabel: str, - outputDescription: str, documentationPlan: Dict) -> ChatContent: - """ - Create a document using a multi-step approach with separate AI calls for each section. - - Args: - prompt: Original task prompt - context: Document context - outputLabel: Output filename - outputDescription: Description of desired output - documentationPlan: Documentation plan from AI - - Returns: - ChatContent object - """ - try: - # Determine format from filename - formatType = outputLabel.split('.')[-1].lower() if '.' in outputLabel else "md" - - # Map format to contentType - contentTypeMap = { - "md": "text/markdown", - "markdown": "text/markdown", - "html": "text/html", - "txt": "text/plain", - "text": "text/plain", - "json": "application/json", - "csv": "text/csv" - } - - contentType = contentTypeMap.get(formatType, "text/plain") - - # Get document information - title = documentationPlan.get("title", "Documentation") - documentType = documentationPlan.get("documentType", "document") - audience = documentationPlan.get("audience", "general") - tone = documentationPlan.get("tone", "formal") - keyTopics = documentationPlan.get("keyTopics", []) - formattingRequirements = documentationPlan.get("formattingRequirements", []) - - # Get the detailed structure - detailedStructure = documentationPlan.get("detailedStructure", []) - - # Step 1: Generate executive summary - summaryPrompt = f""" - Create an executive summary for a {documentType} titled "{title}". - - DOCUMENT OVERVIEW: - - Type: {documentType} - - Audience: {audience} - - Key Topics: {', '.join(keyTopics)} - - TASK CONTEXT: {prompt} - - The executive summary should: - 1. Provide a concise overview of the document's purpose - 2. Highlight key points and findings - 3. Be clear and engaging for the target audience - 4. Set expectations for the document's content - - Keep the summary brief but comprehensive. - """ - - executiveSummary = await self.service.base.callAi([ - {"role": "system", "content": f"You are a documentation expert creating an executive summary in {formatType} format."}, - {"role": "user", "content": summaryPrompt} - ], produceUserAnswer = True) - - # Step 2: Generate introduction - introPrompt = f""" - Create an introduction for a {documentType} titled "{title}". - - DOCUMENT OVERVIEW: - - Type: {documentType} - - Audience: {audience} - - Key Topics: {', '.join(keyTopics)} - - TASK CONTEXT: {prompt} - - The introduction should: - 1. Set the context and purpose of the document - 2. Outline the scope and objectives - 3. Preview the main topics to be covered - 4. Engage the reader's interest - - Format the introduction according to {formatType} standards. - """ - - introduction = await self.service.base.callAi([ - {"role": "system", "content": f"You are a documentation expert creating an introduction in {formatType} format."}, - {"role": "user", "content": introPrompt} - ], produceUserAnswer = True) - - # Step 3: Generate main sections - sections = [] - for section in detailedStructure: - sectionTitle = section.get("title", "Section") - keyPoints = section.get("keyPoints", []) - subsections = section.get("subsections", []) - importance = section.get("importance", "medium") - estimatedLength = section.get("estimatedLength", "medium") - - sectionPrompt = f""" - Create the {sectionTitle} section for a {documentType} titled "{title}". - - SECTION DETAILS: - - Title: {sectionTitle} - - Key Points: {', '.join(keyPoints)} - - Subsections: {', '.join(subsections)} - - Importance: {importance} - - Estimated Length: {estimatedLength} - - DOCUMENT CONTEXT: - - Type: {documentType} - - Audience: {audience} - - Key Topics: {', '.join(keyTopics)} - - TASK CONTEXT: {prompt} - - The section should: - 1. Cover all key points thoroughly - 2. Include relevant subsections - 3. Maintain appropriate depth based on importance - 4. Follow the document's tone and style - - Format the section according to {formatType} standards. - """ - - sectionContent = await self.service.base.callAi([ - {"role": "system", "content": f"You are a documentation expert creating a section in {formatType} format."}, - {"role": "user", "content": sectionPrompt} - ], produceUserAnswer = True) - - sections.append(sectionContent) - - # Step 4: Generate conclusion - conclusionPrompt = f""" - Create the conclusion for a {documentType} titled "{title}". - - DOCUMENT OVERVIEW: - - Type: {documentType} - - Audience: {audience} - - Key Topics: {', '.join(keyTopics)} - - TASK CONTEXT: {prompt} - - This conclusion should: - 1. Summarize the key points covered in the document - 2. Provide closure to the topics discussed - 3. Include any relevant recommendations or next steps - 4. Leave the reader with a clear understanding of the document's significance - - The conclusion should be professional and impactful, formatted according to {formatType} standards. - """ - - conclusion = await self.service.base.callAi([ - {"role": "system", "content": f"You are a documentation expert creating a conclusion in {formatType} format."}, - {"role": "user", "content": conclusionPrompt} - ], produceUserAnswer = True) - - # Step 5: Assemble the complete document - if formatType in ["md", "markdown"]: - # Markdown format - documentContent = f"# {title}\n\n" - - if executiveSummary: - documentContent += f"## Executive Summary\n\n{executiveSummary}\n\n" - - documentContent += f"{introduction}\n\n" - - for i, sectionContent in enumerate(sections): - # Ensure section starts with heading if not already - sectionTitle = detailedStructure[i].get("title", f"Section {i+1}") - if not sectionContent.strip().startswith("#"): - documentContent += f"## {sectionTitle}\n\n" - documentContent += f"{sectionContent}\n\n" - - documentContent += f"## Conclusion\n\n{conclusion}\n" - - elif formatType == "html": - # HTML format - documentContent = f"\n
\nThere was an error generating the documentation: {str(e)}
" - else: - content = f"Error in Documentation\n\nThere was an error generating the documentation: {str(e)}" - - return self.formatAgentDocumentOutput(outputLabel, content, contentType) - - -# Factory function for the Documentation agent -def getAgentDocumentation(): - """Returns an instance of the Documentation agent.""" - return AgentDocumentation() \ No newline at end of file diff --git a/modules/historic_data_agents/agentEmail.py b/modules/historic_data_agents/agentEmail.py deleted file mode 100644 index 6c6e2f5f..00000000 --- a/modules/historic_data_agents/agentEmail.py +++ /dev/null @@ -1,380 +0,0 @@ -""" -Email Agent Module. -Handles email-related tasks using Microsoft Graph API. -""" - -import logging -import json -from typing import Dict, Any, List, Optional, Tuple -import uuid -import os - -from modules.workflow.agentBase import AgentBase -from modules.interfaces.serviceChatModel import Task, ChatDocument, ChatContent - -logger = logging.getLogger(__name__) - -class AgentEmail(AgentBase): - """Agent for handling email-related tasks.""" - - def __init__(self): - """Initialize the email agent.""" - super().__init__() - self.name = "email" - self.label = "Email Agent" - self.description = "Handles email composition and sending using Microsoft Graph API" - self.capabilities = [ - "email_composition", - "email_draft_creation", - "email_template_generation" - ] - self.serviceBase = None - - def setDependencies(self, serviceBase=None): - """Set external dependencies for the agent.""" - self.serviceBase = serviceBase - - async def processTask(self, task: Task) -> Dict[str, Any]: - """ - Process an email-related task. - - Args: - task: Task object containing: - - prompt: Instructions for the agent - - inputDocuments: List of documents to process - - outputSpecifications: List of required output documents - - context: Additional context including workflow info - - Returns: - Dictionary containing: - - feedback: Text response explaining what was done - - documents: List of created documents - """ - try: - # Extract task information - prompt = task.prompt - inputDocuments = task.filesInput - outputSpecs = task.filesOutput - - # Check AI service - if not self.service.base: - return { - "feedback": "The Email agent requires an AI service to function.", - "documents": [] - } - - # Check if Microsoft connector is available - if not hasattr(self.service, 'msft'): - return { - "feedback": "Microsoft connector not available. Please ensure Microsoft integration is properly configured.", - "documents": [] - } - - # Get Microsoft token - token_data = self.service.msft.getMsftToken() - if not token_data: - # Create authentication trigger document - auth_doc = self._createFrontendAuthTriggerDocument() - return { - "feedback": "Microsoft authentication required. Please authenticate to continue.", - "documents": [auth_doc] - } - - # Extract document data from input - documentContents, attachments = self._processInputDocuments(inputDocuments) - - # Generate email subject and body using AI - emailTemplate = await self._generateEmailTemplate(prompt, documentContents) - - # Create HTML preview of the email - htmlPreview = self._createHtmlPreview(emailTemplate) - - # Attempt to create a draft email using Microsoft Graph API - draft_result = self.service.msft.createDraftEmail( - emailTemplate["recipient"], - emailTemplate["subject"], - emailTemplate["htmlBody"], - attachments - ) - - # Prepare output documents - documents = [] - - # Process output specifications - for spec in outputSpecs: - label = spec.get("label", "") - description = spec.get("description", "") - - if label.endswith(".html"): - # Create the HTML template file - templateDoc = self.formatAgentDocumentOutput( - label, - emailTemplate["htmlBody"], # Use the actual HTML body, not the preview - "text/html" - ) - documents.append(templateDoc) - elif label.endswith(".json"): - # Create JSON template if requested - templateJson = json.dumps(emailTemplate, indent=2) - templateDoc = self.formatAgentDocumentOutput( - label, - templateJson, - "application/json" - ) - documents.append(templateDoc) - else: - # Default to preview for other cases - previewDoc = self.formatAgentDocumentOutput( - label, - htmlPreview, - "text/html" - ) - documents.append(previewDoc) - - # Prepare feedback message - if draft_result: - feedback = f"Email draft created successfully for {emailTemplate.get('recipient')}. The subject is: '{emailTemplate['subject']}'" - if attachments: - feedback += f" with {len(attachments)} attachment(s)" - feedback += ". You can open and edit it in your Outlook draft folder." - else: - feedback = "Email template created but could not save as draft. HTML preview and template are available as documents." - - return { - "feedback": feedback, - "documents": documents - } - - except Exception as e: - logger.error(f"Error in email agent: {str(e)}") - return { - "feedback": f"Error processing email task: {str(e)}", - "documents": [] - } - - def _createFrontendAuthTriggerDocument(self) -> ChatDocument: - """Create a document that triggers Microsoft authentication in the frontend.""" - return ChatDocument( - id=str(uuid.uuid4()), - name="microsoft_auth", - ext="html", - data=""" -Please click the button below to authenticate with Microsoft:
- -Please click the button below to authenticate with Microsoft:
- -This email is regarding your request: {prompt}
" - } - - except Exception as e: - logger.warning(f"Error generating email template: {str(e)}") - return { - "recipient": "recipient@example.com", - "subject": "Information Regarding Your Request", - "plainBody": f"This email is regarding your request: {prompt}", - "htmlBody": f"This email is regarding your request: {prompt}
" - } - - def _createHtmlPreview(self, emailTemplate: Dict[str, Any]) -> str: - """ - Create an HTML preview of the email template. - - Args: - emailTemplate: Email template dictionary - - Returns: - HTML string for preview - """ - html = f""" - - - - -No content
')} -Please click the button below to authenticate with Microsoft:
- -An error occurred: {str(e)}
" - else: - content = f"WEB RESEARCH ERROR\n\nAn error occurred: {str(e)}" - - return self.formatAgentDocumentOutput(outputLabel, content, contentType) - - async def _createJsonDocument(self, prompt: str, results: List[Dict[str, Any]], - researchPlan: Dict[str, Any], outputLabel: str) -> Dict[str, Any]: - """ - Create a JSON document from research results. - - Args: - prompt: Original research prompt - results: Research results - researchPlan: Research plan - outputLabel: Output filename - - Returns: - Document object - """ - try: - # Create structured data - sourcesData = [] - for result in results: - sourcesData.append({ - "title": result.get("title", "Untitled"), - "url": result.get("url", ""), - "summary": result.get("summary", ""), - "snippet": result.get("snippet", ""), - "sourceType": result.get("sourceType", "") - }) - - # Create metadata - metadata = { - "query": prompt, - "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), - "researchQuestions": researchPlan.get("researchQuestions", []), - "searchTerms": researchPlan.get("searchTerms", []) - } - - # Compile complete report object - jsonContent = { - "metadata": metadata, - "summary": researchPlan.get("feedback", "Web research results"), - "sources": sourcesData - } - - # Convert to JSON string - content = json.dumps(jsonContent, indent=2) - - return self.formatAgentDocumentOutput(outputLabel, content, "application/json") - - except Exception as e: - logger.error(f"Error creating JSON document: {str(e)}") - return self.formatAgentDocumentOutput(outputLabel, json.dumps({"error": str(e)}), "application/json") - - async def _createCsvDocument(self, results: List[Dict[str, Any]], outputLabel: str) -> Dict[str, Any]: - """ - Create a CSV document from research results. - - Args: - results: Research results - outputLabel: Output filename - - Returns: - Document object - """ - try: - # Create CSV header - csvLines = ["Title,URL,Source Type,Snippet"] - - # Add results - for result in results: - # Escape CSV fields - title = result.get("title", "").replace('"', '""') - url = result.get("url", "").replace('"', '""') - sourceType = result.get("sourceType", "").replace('"', '""') - snippet = result.get("snippet", "").replace('"', '""') - - csvLines.append(f'"{title}","{url}","{sourceType}","{snippet}"') - - # Combine into CSV content - content = "\n".join(csvLines) - - return self.formatAgentDocumentOutput(outputLabel, content, "text/csv") - - except Exception as e: - logger.error(f"Error creating CSV document: {str(e)}") - return self.formatAgentDocumentOutput(outputLabel, "Error,Error\nFailed to create CSV,{0}".format(str(e)), "text/csv") - - def _determineFormatType(self, outputLabel: str) -> str: - """ - Determine the format type based on the filename. - - Args: - outputLabel: Output filename - - Returns: - Format type (markdown, html, text, json, csv) - """ - outputLabelLower = outputLabel.lower() - - if outputLabelLower.endswith(".md"): - return "markdown" - elif outputLabelLower.endswith(".html"): - return "html" - elif outputLabelLower.endswith(".txt"): - return "text" - elif outputLabelLower.endswith(".json"): - return "json" - elif outputLabelLower.endswith(".csv"): - return "csv" - else: - # Default to markdown - return "markdown" - - def _searchWeb(self, query: str) -> List[Dict[str, str]]: - """ - Conduct a web search using SerpAPI and return the results. - - Args: - query: The search query - - Returns: - List of search results - """ - if not self.srcApikey: - return [] - - # Get user language from serviceBase if available - userLanguage = "en" # Default language - if self.service.base.userLanguage: - userLanguage = self.service.base.userLanguage - - try: - # Format the search request for SerpAPI - params = { - "engine": self.srcEngine, - "q": query, - "api_key": self.srcApikey, - "num": self.maxResults, # Number of results to return - "hl": userLanguage # Identified user language - } - - # Make the API request - response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout) - response.raise_for_status() - - # Parse JSON response - search_results = response.json() - - # Extract organic results - results = [] - - if "organic_results" in search_results: - for result in search_results["organic_results"][:self.maxResults]: - # Extract title - title = result.get("title", "No title") - - # Extract URL - url = result.get("link", "No URL") - - # Extract snippet - snippet = result.get("snippet", "No description") - - # Get actual page content - try: - targetPageSoup = self._readUrl(url) - content = self._extractMainContent(targetPageSoup) - except Exception as e: - logger.warning(f"Error extracting content from {url}: {str(e)}") - content = f"Error extracting content: {str(e)}" - - results.append({ - 'title': title, - 'url': url, - 'snippet': snippet, - 'data': content - }) - - # Limit number of results - if len(results) >= self.maxResults: - break - else: - logger.warning(f"No organic results found in SerpAPI response for: {query}") - - return results - - except Exception as e: - logger.error(f"Error searching with SerpAPI for {query}: {str(e)}") - return [] - - def _readUrl(self, url: str) -> BeautifulSoup: - """ - Read a URL and return a BeautifulSoup parser for the content. - - Args: - url: The URL to read - - Returns: - BeautifulSoup object with the content or None on errors - """ - if not url or not url.startswith(('http://', 'https://')): - return None - - headers = { - 'User-Agent': self.userAgent, - 'Accept': 'text/html,application/xhtml+xml,application/xml', - 'Accept-Language': 'en-US,en;q=0.9', - } - - try: - # Initial request - response = requests.get(url, headers=headers, timeout=self.timeout) - - # Handling for status 202 - if response.status_code == 202: - # Retry with backoff - backoffTimes = [0.5, 1.0, 2.0, 5.0] - - for waitTime in backoffTimes: - time.sleep(waitTime) - response = requests.get(url, headers=headers, timeout=self.timeout) - - if response.status_code != 202: - break - - # Raise for error status codes - response.raise_for_status() - - # Parse HTML - return BeautifulSoup(response.text, 'html.parser') - - except Exception as e: - logger.error(f"Error reading URL {url}: {str(e)}") - return None - - def _extractTitle(self, soup: BeautifulSoup, url: str) -> str: - """ - Extract the title from a webpage. - - Args: - soup: BeautifulSoup object of the webpage - url: URL of the webpage - - Returns: - Extracted title - """ - if not soup: - return f"Error with {url}" - - # Extract title from title tag - titleTag = soup.find('title') - title = titleTag.text.strip() if titleTag else "No title" - - # Alternative: Also look for h1 tags if title tag is missing - if title == "No title": - h1Tag = soup.find('h1') - if h1Tag: - title = h1Tag.text.strip() - - return title - - def _extractMainContent(self, soup: BeautifulSoup, maxChars: int = 10000) -> str: - """ - Extract the main content from an HTML page. - - Args: - soup: BeautifulSoup object of the webpage - maxChars: Maximum number of characters - - Returns: - Extracted main content as a string - """ - if not soup: - return "" - - # Try to find main content elements in priority order - mainContent = None - for selector in ['main', 'article', '#content', '.content', '#main', '.main']: - content = soup.select_one(selector) - if content: - mainContent = content - break - - # If no main content found, use the body - if not mainContent: - mainContent = soup.find('body') or soup - - # Remove script, style, nav, footer elements that don't contribute to main content - for element in mainContent.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'): - element.extract() - - # Extract text content - textContent = mainContent.get_text(separator=' ', strip=True) - - # Limit to maxChars - return textContent[:maxChars] - - def _limitText(self, text: str, maxChars: int = 10000) -> str: - """ - Limit text to a maximum number of characters. - - Args: - text: Input text - maxChars: Maximum number of characters - - Returns: - Limited text - """ - if not text: - return "" - - # If text is already under the limit, return unchanged - if len(text) <= maxChars: - return text - - # Otherwise limit text to maxChars - return text[:maxChars] + "... [Content truncated due to length]" - - -# Factory function for the Webcrawler agent -def getAgentWebcrawler(): - """Returns an instance of the Webcrawler agent.""" - return AgentWebcrawler() \ No newline at end of file diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index 5bb48da3..c29fd70e 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -6,7 +6,7 @@ Uses the JSON connector for data access with added language support. import os import logging import uuid -from datetime import datetime +from datetime import datetime, UTC from typing import Dict, Any, List, Optional, Union import asyncio @@ -327,6 +327,11 @@ class ChatObjects: publishedAt=createdMessage.get("publishedAt", self._getCurrentTimestamp()), stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None ) + + # Update workflow stats for message creation (estimate bytes for message) + message_size = len(createdMessage.get("message", "")) + sum(len(doc.get("filename", "")) for doc in createdMessage.get("documents", [])) + self.updateWorkflowStats(workflowId, bytesSent=0, bytesReceived=message_size) + except Exception as e: logger.error(f"Error creating workflow message: {str(e)}") return None @@ -535,6 +540,64 @@ class ChatObjects: # Get logs for this workflow return [ChatLog(**log) for log in self.db.getRecordset("workflowLogs", recordFilter={"workflowId": workflowId})] + def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool: + """Updates workflow statistics during execution with incremental values.""" + try: + # Get current workflow + workflow = self.getWorkflow(workflowId) + if not workflow: + logger.error(f"Workflow {workflowId} not found for stats update") + return False + + if not self._canModify("workflows", workflowId): + logger.error(f"No permission to update workflow {workflowId} stats") + return False + + # Get current stats + currentStats = workflow.stats.dict() if workflow.stats else { + "bytesSent": 0, + "bytesReceived": 0, + "tokenCount": 0, + "processingTime": 0 + } + + # Calculate processing time from workflow start + workflow_start = datetime.fromisoformat(workflow.startedAt.replace('Z', '+00:00')) + current_time = datetime.now(UTC) + processing_time = (current_time - workflow_start).total_seconds() + + # Update stats with incremental values + currentStats["bytesSent"] = currentStats.get("bytesSent", 0) + bytesSent + currentStats["bytesReceived"] = currentStats.get("bytesReceived", 0) + bytesReceived + currentStats["tokenCount"] = currentStats["bytesSent"] + currentStats["bytesReceived"] + currentStats["processingTime"] = processing_time + + # Update workflow in database + self.db.recordModify("workflows", workflowId, { + "dataStats": currentStats + }) + + # Log to stats table + stats_record = { + "timestamp": self._getCurrentTimestamp(), + "workflowId": workflowId, + "bytesSent": bytesSent, + "bytesReceived": bytesReceived, + "tokenCount": bytesSent + bytesReceived, + "processingTime": processing_time + } + + # Create stats record in database + self.db.recordCreate("stats", stats_record) + + logger.debug(f"Updated workflow {workflowId} stats: {currentStats}") + logger.debug(f"Logged stats record: {stats_record}") + return True + + except Exception as e: + logger.error(f"Error updating workflow stats: {str(e)}") + return False + def createWorkflowLog(self, logData: Dict[str, Any]) -> ChatLog: """Creates a log entry for a workflow if user has access.""" # Check workflow access @@ -777,14 +840,7 @@ class ChatObjects: # Create workflow workflow = self.createWorkflow(workflowData) - # Add log entry - self.createWorkflowLog({ - "workflowId": workflow.id, - "message": "Workflow started", - "type": "info", - "status": "running", - "progress": 0 - }) + # Remove the 'Workflow started' log entry # Start workflow processing from modules.workflow.managerWorkflow import WorkflowManager diff --git a/modules/methods/methodCoder.py b/modules/methods/EXCLUDED_methodCoder.py similarity index 93% rename from modules/methods/methodCoder.py rename to modules/methods/EXCLUDED_methodCoder.py index d9cc5289..33d285a0 100644 --- a/modules/methods/methodCoder.py +++ b/modules/methods/EXCLUDED_methodCoder.py @@ -10,9 +10,9 @@ logger = logging.getLogger(__name__) class MethodCoder(MethodBase): """Coder method implementation for code operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the coder method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "coder" self.description = "Handle code operations like analysis, generation, and refactoring" @@ -87,7 +87,18 @@ class MethodCoder(MethodBase): ) # Extract text content from ExtractedContent objects - text_contents = self.service.extractTextFromContentObjects(all_code_content) + text_contents = [] + for content_obj in all_code_content: + if hasattr(content_obj, 'contents') and content_obj.contents: + # Extract text from ContentItem objects + for content_item in content_obj.contents: + if hasattr(content_item, 'data') and content_item.data: + text_contents.append(content_item.data) + elif isinstance(content_obj, str): + text_contents.append(content_obj) + else: + # Fallback: convert to string representation + text_contents.append(str(content_obj)) # Combine all extracted text content for analysis combined_content = "\n\n--- CODE SEPARATOR ---\n\n".join(text_contents) diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py index a1b437de..208f736d 100644 --- a/modules/methods/methodDocument.py +++ b/modules/methods/methodDocument.py @@ -8,7 +8,6 @@ from typing import Dict, Any, List, Optional import uuid from datetime import datetime, UTC -from modules.workflow.managerDocument import DocumentManager from modules.workflow.methodBase import MethodBase, ActionResult, action logger = logging.getLogger(__name__) @@ -16,12 +15,11 @@ logger = logging.getLogger(__name__) class MethodDocument(MethodBase): """Document method implementation for document operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the document method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "document" self.description = "Handle document operations like extraction and analysis" - self.documentManager = DocumentManager(serviceContainer) @action async def extract(self, parameters: Dict[str, Any]) -> ActionResult: @@ -94,7 +92,18 @@ class MethodDocument(MethodBase): ) # Extract text content from ExtractedContent objects - text_contents = self.service.extractTextFromContentObjects(all_extracted_content) + text_contents = [] + for content_obj in all_extracted_content: + if hasattr(content_obj, 'contents') and content_obj.contents: + # Extract text from ContentItem objects + for content_item in content_obj.contents: + if hasattr(content_item, 'data') and content_item.data: + text_contents.append(content_item.data) + elif isinstance(content_obj, str): + text_contents.append(content_obj) + else: + # Fallback: convert to string representation + text_contents.append(str(content_obj)) # Combine all extracted text content combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join(text_contents) diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py index f681931e..fb226731 100644 --- a/modules/methods/methodOutlook.py +++ b/modules/methods/methodOutlook.py @@ -16,9 +16,9 @@ logger = logging.getLogger(__name__) class MethodOutlook(MethodBase): """Outlook method implementation for email operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the Outlook method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "outlook" self.description = "Handle Microsoft Outlook email operations" diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py index cb36b57b..dbfc4c1f 100644 --- a/modules/methods/methodSharepoint.py +++ b/modules/methods/methodSharepoint.py @@ -16,8 +16,8 @@ logger = logging.getLogger(__name__) class MethodSharepoint(MethodBase): """SharePoint method implementation for document operations""" - def __init__(self, serviceContainer: Any): - super().__init__(serviceContainer) + def __init__(self, serviceCenter: Any): + super().__init__(serviceCenter) self.name = "sharepoint" self.description = "Handle Microsoft SharePoint document operations" diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py index e993ab55..4602a5a0 100644 --- a/modules/methods/methodWeb.py +++ b/modules/methods/methodWeb.py @@ -19,9 +19,9 @@ logger = logging.getLogger(__name__) class MethodWeb(MethodBase): """Web method implementation for web operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the web method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "web" self.description = "Handle web operations like crawling and scraping" @@ -452,7 +452,7 @@ class MethodWeb(MethodBase): "query": query } else: - # Get user language from service container if available + # Get user language from service center if available userLanguage = "en" # Default language if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'): userLanguage = self.service.user.language diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index de39a1c4..e01cfeb7 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -176,7 +176,7 @@ async def get_workflow_status( ) -> ChatWorkflow: """Get the current status of a workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Retrieve workflow @@ -208,7 +208,7 @@ async def get_workflow_logs( ) -> List[ChatLog]: """Get logs for a workflow with support for selective data transfer.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists @@ -251,7 +251,7 @@ async def get_workflow_messages( ) -> List[ChatMessage]: """Get messages for a workflow with support for selective data transfer.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists @@ -297,7 +297,7 @@ async def start_workflow( Corresponds to State 1 in the state machine documentation. """ try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Start or continue workflow using ChatObjects @@ -322,7 +322,7 @@ async def stop_workflow( ) -> ChatWorkflow: """Stops a running workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Stop workflow using ChatObjects @@ -347,7 +347,7 @@ async def delete_workflow( ) -> Dict[str, Any]: """Deletes a workflow and its associated data.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Get raw workflow data from database to check permissions @@ -402,7 +402,7 @@ async def delete_workflow_message( ) -> Dict[str, Any]: """Delete a message from a workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists @@ -453,7 +453,7 @@ async def delete_file_from_message( ) -> Dict[str, Any]: """Delete a file reference from a message in a workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py index 864a7dc9..470653d8 100644 --- a/modules/workflow/managerChat.py +++ b/modules/workflow/managerChat.py @@ -2,6 +2,7 @@ import asyncio import logging import uuid import json +import time from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC @@ -9,7 +10,7 @@ from modules.interfaces.interfaceAppModel import User from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) -from modules.workflow.serviceContainer import ServiceContainer +from modules.workflow.serviceCenter import ServiceCenter from modules.interfaces.interfaceChatObjects import ChatObjects logger = logging.getLogger(__name__) @@ -20,7 +21,7 @@ class ChatManager: def __init__(self, currentUser: User, chatInterface: ChatObjects): self.currentUser = currentUser self.chatInterface = chatInterface - self.service: ServiceContainer = None + self.service: ServiceCenter = None self.workflow: ChatWorkflow = None # Circuit breaker for AI calls @@ -37,7 +38,7 @@ class ChatManager: async def initialize(self, workflow: ChatWorkflow) -> None: """Initialize chat manager with workflow""" self.workflow = workflow - self.service = ServiceContainer(self.currentUser, self.workflow) + self.service = ServiceCenter(self.currentUser, self.workflow) # ===== WORKFLOW PHASES ===== @@ -119,6 +120,12 @@ class ChatManager: task_actions.append(task_action) logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}") + # Update stats for task validation (estimate bytes for action validation) + if task_actions: + # Calculate actual action size for stats + action_size = self.service.calculateObjectSize(task_actions) + self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size) + logger.info(f"Task action definition completed: {len(task_actions)} actions") return task_actions @@ -265,6 +272,7 @@ class ChatManager: async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: """Process file IDs and return ChatDocument objects""" documents = [] + for fileId in fileIds: try: # Ensure service is initialized @@ -290,6 +298,8 @@ class ChatManager: logger.warning(f"No file info found for file ID {fileId}") except Exception as e: logger.error(f"Error processing file ID {fileId}: {str(e)}") + + return documents def setUserLanguage(self, language: str) -> None: @@ -768,7 +778,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" 'documents_metadata': documents_metadata, 'actionId': action_result.get('actionId', ''), 'actionMethod': action_result.get('actionMethod', ''), - 'actionName': action_result.get('actionName', '') + 'actionName': action_result.get('actionName', ''), + 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none' } step_result_serializable['action_results'].append(serializable_action_result) @@ -787,6 +798,13 @@ INSTRUCTIONS: 4. Decide on next action: continue, retry, or fail 5. If retry, provide specific improvements needed +IMPORTANT NOTES: +- Actions can produce either text results OR documents (or both) +- Empty result_summary is acceptable if documents were produced (documents_count > 0) +- Focus on whether the action achieved its intended purpose, not just text output +- Document-based actions (like file extractions) often have empty text results but successful document outputs +- Check the 'success_indicator' field: 'documents' means success via document output, 'text_result' means success via text, 'none' means no output + REQUIRED JSON STRUCTURE: {{ "status": "success|retry|failed", @@ -829,7 +847,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]: """Execute a single action and return result with enhanced document processing""" try: - # Execute the actual method action using the service container + # Execute the actual method action using the service center result = await self.service.executeAction( methodName=action.execMethod, actionName=action.execAction, @@ -943,7 +961,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" message_data = { "workflowId": workflow.id, "role": "assistant", - "message": f"Executed {action.execMethod}.{action.execAction} successfully", + "message": f"Executed action {action.execMethod}.{action.execAction}", "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), @@ -979,7 +997,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" file_size = len(str(doc_data)) mime_type = "application/octet-stream" - # Enhanced MIME type detection using service container + # Enhanced MIME type detection using service center if mime_type == "application/octet-stream": mime_type = self._detectMimeTypeFromContent(document_data, document_name) @@ -1045,7 +1063,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" def _detectMimeTypeFromContent(self, content: Any, filename: str) -> str: """ - Detect MIME type from content and filename using service container. + Detect MIME type from content and filename using service center. Only returns a detected MIME type if it's better than application/octet-stream. Args: @@ -1065,7 +1083,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" else: file_bytes = str(content).encode('utf-8') - # Use service container's MIME type detection + # Use service center's MIME type detection detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) if detected_mime_type != "application/octet-stream": return detected_mime_type @@ -1076,7 +1094,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" def _detectMimeTypeFromDocument(self, document: Any, filename: str) -> str: """ - Detect MIME type from document object using service container. + Detect MIME type from document object using service center. Only returns a detected MIME type if it's better than application/octet-stream. Args: @@ -1094,7 +1112,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" else: file_bytes = str(content).encode('utf-8') - # Use service container's MIME type detection + # Use service center's MIME type detection detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) if detected_mime_type != "application/octet-stream": return detected_mime_type @@ -1222,8 +1240,11 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" action_results = review_context.get('action_results', []) if action_results: # Check for common issues that warrant retry + # Only consider empty results a problem if there are no documents produced has_empty_results = any( - not result.get('result', '').strip() + not result.get('result', '').strip() and + not result.get('documents', []) and + not result.get('documents_metadata', []) for result in action_results if result.get('status') == 'completed' ) @@ -1417,7 +1438,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant'] # Generate summary feedback - feedback = f"Workflow completed successfully.\n\n" + feedback = f"Workflow completed.\n\n" feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n" # Add final status @@ -1437,36 +1458,38 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" # ===== UNIFIED WORKFLOW EXECUTION ===== async def executeUnifiedWorkflow(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]: - """Execute workflow using the new unified phases with retry logic""" + """Execute a unified workflow with all phases""" try: logger.info(f"Starting unified workflow execution for workflow {workflow.id}") + start_time = time.time() - # Create user-friendly progress log - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": "Starting workflow analysis and planning", - "type": "info", - "status": "running", - "progress": 5, - "agentName": "System" - }) + # Initialize chat manager with workflow + await self.initialize(workflow) + + # Process file IDs if provided + documents = [] + if hasattr(userInput, 'listFileId') and userInput.listFileId: + documents = await self.processFileIds(userInput.listFileId) + logger.info(f"Processed {len(documents)} documents") + + # Calculate and update user input stats + user_input_size = self.service.calculateUserInputSize(userInput) + self.service.updateWorkflowStats(eventLabel="userinput", bytesReceived=user_input_size) # Phase 1: High-Level Task Planning - logger.info("=== PHASE 1: HIGH-LEVEL TASK PLANNING ===") - task_plan = await self.planHighLevelTasks(userInput, workflow) - if not task_plan or not task_plan.get('tasks'): - logger.error("Failed to create task plan") - return { - 'status': 'failed', - 'error': 'Failed to create task plan', - 'phase': 'planning' - } + logger.info("--- PHASE 1: HIGH-LEVEL TASK PLANNING ---") + task_plan = await self.planHighLevelTasks(userInput.prompt, workflow) + + # Update stats for task planning + task_plan_size = self.service.calculateObjectSize(task_plan) + self.service.updateWorkflowStats(eventLabel="taskplan", bytesSent=task_plan_size) # Create user-friendly task plan log tasks_count = len(task_plan.get('tasks', [])) + task_descriptions = "\n".join([f"- {task.get('description', 'No description')}" for task in task_plan.get('tasks', [])]) self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Planning completed: {tasks_count} tasks identified", + "message": f"Planning completed: {tasks_count} tasks identified\n{task_descriptions}", "type": "info", "status": "running", "progress": 15, @@ -1598,22 +1621,29 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" logger.debug(f"TASK {i+1} ACTIONS CREATED: {json.dumps(task_actions_serializable, indent=2, ensure_ascii=False)}") # Phase 3: Execute Task Actions - logger.info(f"--- PHASE 3: EXECUTING ACTIONS FOR TASK {i+1} ---") + logger.info(f"--- PHASE 3: EXECUTING TASK {i+1} ACTIONS ---") action_results = await self.executeTaskActions(task_actions, workflow) + # Update stats for action execution + # Action stats are already handled by the service center during AI calls + # Create user-friendly action completion log with quality metrics successful_actions = sum(1 for result in action_results if result.get('status') == 'completed') total_actions = len(action_results) if total_actions > 0: - quality_percentage = (successful_actions / total_actions) * 100 + if successful_actions == total_actions: + log_type = "success" + elif successful_actions == 0: + log_type = "error" + else: + log_type = "warning" self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} actions completed: {successful_actions}/{total_actions} successful ({quality_percentage:.0f}% quality)", - "type": "success" if quality_percentage >= 80 else "warning" if quality_percentage >= 60 else "error", + "message": f"Successful actions: {successful_actions}/{total_actions}", + "type": log_type, "status": "running", - "progress": progress + 10, - "agentName": "System" + "progress": progress + 10 }) # Log action results (with metadata only) @@ -1653,6 +1683,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" logger.info(f"--- PHASE 4: REVIEWING TASK {i+1} COMPLETION ---") review_result = await self.reviewTaskCompletion(task_step, task_actions, action_results, workflow) + # Update stats for task review + # Task review stats are already handled by the service center during AI calls + # Create user-friendly review log with quality metrics quality_metrics = review_result.get('quality_metrics', {}) quality_score = quality_metrics.get('score', 0) @@ -1662,29 +1695,62 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" if review_status == 'success': self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} completed successfully (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)", + "message": f"🎯 Task completed successfully with quality score {quality_score} and confidence {confidence}", "type": "success", "status": "running", - "progress": progress + 20, - "agentName": "System" + "progress": progress + 20 }) elif review_status == 'retry': + # Extract improvement details + improvements = review_result.get('improvements', '') + reason = review_result.get('reason', '') + unmet_criteria = review_result.get('unmet_criteria', []) + + # Build detailed message + retry_details = [] + if reason: + retry_details.append(f"Reason: {reason}") + if improvements: + retry_details.append(f"Improvements: {improvements}") + if unmet_criteria: + retry_details.append(f"Missing criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}") + + retry_message = f"🔄 Task needs improvement" + if retry_details: + retry_message += f"\n{chr(10).join(retry_details)}" + self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} needs improvement (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)", + "message": retry_message, "type": "warning", "status": "running", - "progress": progress + 15, - "agentName": "System" + "progress": progress + 15 }) else: + # Extract failure details + reason = review_result.get('reason', '') + unmet_criteria = review_result.get('unmet_criteria', []) + missing_outputs = review_result.get('missing_outputs', []) + + # Build detailed failure message + failure_details = [] + if reason: + failure_details.append(f"Reason: {reason}") + if unmet_criteria: + failure_details.append(f"Unmet criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}") + if missing_outputs: + failure_details.append(f"Missing outputs: {', '.join(missing_outputs[:3])}{'...' if len(missing_outputs) > 3 else ''}") + + failure_message = f"❌ Task failed" + if failure_details: + failure_message += f"\n{chr(10).join(failure_details)}" + self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} failed (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)", + "message": failure_message, "type": "error", "status": "running", - "progress": progress + 15, - "agentName": "System" + "progress": progress + 15 }) # Log review result (with metadata only) @@ -1724,7 +1790,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" previous_review_feedback = review_result.get('improvements', '') retry_count += 1 - if retry_count >= max_retries: + if retry_count > max_retries: logger.error(f"Task {i+1} failed after {max_retries} retries") task_success = False else: @@ -1775,35 +1841,37 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" # Final workflow summary successful_tasks = sum(1 for result in workflow_results if result.get('task_success', False)) - total_tasks = len(workflow_results) + total_tasks = len(task_plan['tasks']) + + # Final workflow stats are already handled by the service center during AI calls + + # Calculate total processing time + total_processing_time = time.time() - start_time # Create final user-friendly completion log if successful_tasks == total_tasks: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Workflow completed successfully: {successful_tasks}/{total_tasks} tasks completed", + "message": f"🎉 Workflow completed ({successful_tasks}/{total_tasks} tasks)", "type": "success", "status": "completed", - "progress": 100, - "agentName": "System" + "progress": 100 }) elif successful_tasks > 0: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Workflow completed partially: {successful_tasks}/{total_tasks} tasks completed", + "message": f"⚠️ Workflow partially completed ({successful_tasks}/{total_tasks} tasks)", "type": "warning", "status": "completed", - "progress": 100, - "agentName": "System" + "progress": 100 }) else: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Workflow failed: {successful_tasks}/{total_tasks} tasks completed", + "message": f"❌ Workflow failed ({successful_tasks}/{total_tasks} tasks)", "type": "error", "status": "failed", - "progress": 100, - "agentName": "System" + "progress": 100 }) # Create serializable workflow results (with metadata only) @@ -1836,7 +1904,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" 'documents_metadata': documents_metadata, 'actionId': action_result.get('actionId', ''), 'actionMethod': action_result.get('actionMethod', ''), - 'actionName': action_result.get('actionName', '') + 'actionName': action_result.get('actionName', ''), + 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none' } action_results_metadata.append(action_result_metadata) diff --git a/modules/workflow/managerDocument.py b/modules/workflow/managerDocument.py deleted file mode 100644 index b1b8e709..00000000 --- a/modules/workflow/managerDocument.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Document Manager Module for handling document operations and content extraction. -""" - -import logging - -from modules.interfaces.interfaceChatModel import ( - ChatDocument, - ExtractedContent -) -from modules.workflow.processorDocument import DocumentProcessor - -logger = logging.getLogger(__name__) - -class DocumentManager: - """Manager for document operations and content extraction""" - - def __init__(self, serviceContainer): - self.service = serviceContainer - # Create processor with service container for AI calls - self._processor = DocumentProcessor(serviceContainer) - - async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: - """Extract content from ChatDocument using prompt""" - try: - # Extract file data from ChatDocument - if document.data: - fileData = document.data.encode('utf-8') if isinstance(document.data, str) else document.data - else: - # Try to get file data from service container if document has fileId - if hasattr(document, 'fileId') and document.fileId: - fileData = self.service.getFileData(document.fileId) - else: - logger.error(f"No file data available in document: {document}") - raise ValueError("No file data available in document") - - # Get filename and mime type from document - filename = document.filename if hasattr(document, 'filename') else "document" - mimeType = document.mimeType if hasattr(document, 'mimeType') else "application/octet-stream" - - # Process with processor - extractedContent = await self._processor.processFileData( - fileData=fileData, - filename=filename, - mimeType=mimeType, - base64Encoded=False, - prompt=prompt - ) - - # Update objectId to match document ID - extractedContent.objectId = document.id - extractedContent.objectType = "ChatDocument" - - return extractedContent - - except Exception as e: - logger.error(f"Error extracting from document: {str(e)}") - raise - - async def extractContentFromFileData(self, prompt: str, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, documentId: str = None) -> ExtractedContent: - """Extract content from file data directly using prompt""" - try: - return await self._processor.processFileData( - fileData=fileData, - filename=filename, - mimeType=mimeType, - base64Encoded=base64Encoded, - prompt=prompt, - documentId=documentId - ) - except Exception as e: - logger.error(f"Error extracting from file data: {str(e)}") - raise diff --git a/modules/workflow/methodBase.py b/modules/workflow/methodBase.py index fe109512..8f09cb52 100644 --- a/modules/workflow/methodBase.py +++ b/modules/workflow/methodBase.py @@ -20,9 +20,9 @@ def action(func): class MethodBase: """Base class for all methods""" - def __init__(self, serviceContainer: Any): - """Initialize method with service container""" - self.service = serviceContainer + def __init__(self, serviceCenter: Any): + """Initialize method with service center""" + self.service = serviceCenter self.name: str self.description: str self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py index 75929f86..323c6f7f 100644 --- a/modules/workflow/processorDocument.py +++ b/modules/workflow/processorDocument.py @@ -32,10 +32,10 @@ class FileProcessingError(Exception): class DocumentProcessor: """Processor for handling document operations and content extraction.""" - def __init__(self, serviceContainer=None): + def __init__(self, serviceCenter=None): """Initialize the document processor.""" self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None - self._serviceContainer = serviceContainer + self._serviceCenter = serviceCenter self.supportedTypes: Dict[str, Callable[[bytes, str, str], Awaitable[List[ContentItem]]]] = { 'text/plain': self._processText, @@ -136,7 +136,7 @@ class DocumentProcessor: # Detect content type if needed if mimeType == "application/octet-stream": - mimeType = self._serviceContainer.detectContentTypeFromData(fileData, filename) + mimeType = self._serviceCenter.detectContentTypeFromData(fileData, filename) # Process document based on type if mimeType not in self.supportedTypes: @@ -527,7 +527,7 @@ class DocumentProcessor: # chunk is already base64 encoded string from _processImage # Use the original prompt directly for images (no content embedding) logger.debug(f"Calling image AI service for MIME type: {mimeType}") - processedContent = await self._serviceContainer.callAiImageBasic(prompt, chunk, mimeType) + processedContent = await self._serviceCenter.callAiImageBasic(prompt, chunk, mimeType) else: # For text content, use text AI service # Neutralize content if neutralizer is enabled (only for text) @@ -548,7 +548,7 @@ class DocumentProcessor: """ logger.debug(f"Calling text AI service for MIME type: {mimeType}") - processedContent = await self._serviceContainer.callAiTextBasic(aiPrompt, contentToProcess) + processedContent = await self._serviceCenter.callAiTextBasic(aiPrompt, contentToProcess) chunkResults.append(processedContent) except Exception as aiError: diff --git a/modules/workflow/serviceContainer.py b/modules/workflow/serviceCenter.py similarity index 76% rename from modules/workflow/serviceContainer.py rename to modules/workflow/serviceCenter.py index ea2f87b1..d403aca9 100644 --- a/modules/workflow/serviceContainer.py +++ b/modules/workflow/serviceCenter.py @@ -8,14 +8,14 @@ from modules.interfaces.interfaceAppModel import User, UserConnection from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, - ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange + ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange, ExtractedContent ) from modules.interfaces.interfaceAiCalls import AiCalls from modules.interfaces.interfaceChatObjects import getInterface as getChatObjects from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects -from modules.workflow.managerDocument import DocumentManager +from modules.workflow.processorDocument import DocumentProcessor from modules.workflow.methodBase import MethodBase import uuid import base64 @@ -23,8 +23,8 @@ import hashlib logger = logging.getLogger(__name__) -class ServiceContainer: - """Service container that provides access to all services and their functions""" +class ServiceCenter: + """Service center that provides access to all services and their functions""" def __init__(self, currentUser: User, workflow: ChatWorkflow): # Core services @@ -39,7 +39,7 @@ class ServiceContainer: self.interfaceComponent = getComponentObjects(currentUser) self.interfaceApp = getAppObjects(currentUser) self.interfaceAiCalls = AiCalls() - self.documentManager = DocumentManager(self) + self.documentProcessor = DocumentProcessor(self) # Initialize methods catalog self.methods = {} @@ -115,7 +115,7 @@ class ServiceContainer: def detectContentTypeFromData(self, fileData: bytes, filename: str) -> str: """ Detect content type from file data and filename. - This method makes the MIME type detection function accessible through the service container. + This method makes the MIME type detection function accessible through the service center. Args: fileData: Raw file data as bytes @@ -263,17 +263,11 @@ class ServiceContainer: # ===== Functions ===== - def extractContent(self, prompt: str, document: ChatDocument) -> str: + def extractContent(self, prompt: str, document: ChatDocument) -> ExtractedContent: """Extract content from document using prompt""" - return self.documentManager.extractContentFromDocument(prompt, document) + return self.extractContentFromDocument(prompt, document) - async def extractContentFromFileData(self, prompt: str, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, documentId: str = None) -> str: - """Extract content from file data directly using prompt""" - extracted_content = await self.documentManager.extractContentFromFileData(prompt, fileData, filename, mimeType, base64Encoded, documentId) - # Convert ExtractedContent to string for backward compatibility - if hasattr(extracted_content, 'contents'): - return "\n".join([item.data for item in extracted_content.contents]) - return str(extracted_content) + def getMethodsCatalog(self) -> Dict[str, Any]: """Get catalog of available methods and their actions""" @@ -502,7 +496,7 @@ Instructions: Please provide a comprehensive summary of this conversation.""" # Get summary using AI - return await self.interfaceAiCalls.callAiTextBasic(prompt) + return await self.callAiTextBasic(prompt) except Exception as e: logger.error(f"Error summarizing chat: {str(e)}") @@ -535,27 +529,81 @@ Instructions: Please provide a clear summary of this message.""" # Get summary using AI - return await self.interfaceAiCalls.callAiTextBasic(prompt) + return await self.callAiTextBasic(prompt) except Exception as e: logger.error(f"Error summarizing message: {str(e)}") return f"Error summarizing message: {str(e)}" - def callAiTextBasic(self, prompt: str, context: str = None) -> str: + async def callAiTextBasic(self, prompt: str, context: str = None) -> str: """Basic text processing using OpenAI""" - return self.interfaceAiCalls.callAiTextBasic(prompt, context) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + if context: + prompt_size += self.calculateObjectSize(context) + + # Call AI + response = await self.interfaceAiCalls.callAiTextBasic(prompt, context) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.openai.text", bytesSent=prompt_size, bytesReceived=response_size) + + return response - def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: + async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: """Advanced text processing using Anthropic""" - return self.interfaceAiCalls.callAiTextAdvanced(prompt, context) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + if context: + prompt_size += self.calculateObjectSize(context) + + # Call AI + response = await self.interfaceAiCalls.callAiTextAdvanced(prompt, context) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.anthropic.text", bytesSent=prompt_size, bytesReceived=response_size) + + return response - def callAiImageBasic(self, prompt: str, imageData: str, mimeType: str) -> str: + async def callAiImageBasic(self, prompt: str, imageData: str, mimeType: str) -> str: """Basic image processing using OpenAI""" - return self.interfaceAiCalls.callAiImageBasic(prompt, imageData, mimeType) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + prompt_size += self.calculateObjectSize(imageData) + + # Call AI + response = await self.interfaceAiCalls.callAiImageBasic(prompt, imageData, mimeType) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.openai.image", bytesSent=prompt_size, bytesReceived=response_size) + + return response - def callAiImageAdvanced(self, prompt: str, imageData: str, mimeType: str) -> str: + async def callAiImageAdvanced(self, prompt: str, imageData: str, mimeType: str) -> str: """Advanced image processing using Anthropic""" - return self.interfaceAiCalls.callAiImageAdvanced(prompt, imageData, mimeType) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + prompt_size += self.calculateObjectSize(imageData) + + # Call AI + response = await self.interfaceAiCalls.callAiImageAdvanced(prompt, imageData, mimeType) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.anthropic.image", bytesSent=prompt_size, bytesReceived=response_size) + + return response def getFileInfo(self, fileId: str) -> Dict[str, Any]: """Get file information""" @@ -575,6 +623,59 @@ Please provide a clear summary of this message.""" """Get file data by ID""" return self.interfaceComponent.getFileData(fileId) + async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: + """Extract content from ChatDocument using prompt""" + try: + # Extract file data from ChatDocument + if document.data: + fileData = document.data.encode('utf-8') if isinstance(document.data, str) else document.data + else: + # Try to get file data from service center if document has fileId + if hasattr(document, 'fileId') and document.fileId: + fileData = self.getFileData(document.fileId) + else: + logger.error(f"No file data available in document: {document}") + raise ValueError("No file data available in document") + + # Get filename and mime type from document + filename = document.filename if hasattr(document, 'filename') else "document" + mimeType = document.mimeType if hasattr(document, 'mimeType') else "application/octet-stream" + + # Process with document processor directly + extractedContent = await self.documentProcessor.processFileData( + fileData=fileData, + filename=filename, + mimeType=mimeType, + base64Encoded=False, + prompt=prompt, + documentId=document.id + ) + + # Update objectId to match document ID + extractedContent.objectId = document.id + extractedContent.objectType = "ChatDocument" + + return extractedContent + + except Exception as e: + logger.error(f"Error extracting from document: {str(e)}") + raise + + async def extractContentFromFileData(self, prompt: str, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, documentId: str = None) -> ExtractedContent: + """Extract content from file data directly using prompt""" + try: + return await self.documentProcessor.processFileData( + prompt=prompt, + fileData=fileData, + filename=filename, + mimeType=mimeType, + base64Encoded=base64Encoded, + documentId=documentId + ) + except Exception as e: + logger.error(f"Error extracting from file data: {str(e)}") + raise + def createFile(self, fileName: str, mimeType: str, content: str, base64encoded: bool = False) -> str: """Create new file and return its ID""" # Convert content to bytes based on base64 flag @@ -613,29 +714,85 @@ Please provide a clear summary of this message.""" mimeType=mimeType ) - def extractTextFromContentObjects(self, content_objects: List[Any]) -> List[str]: + def updateWorkflowStats(self, eventLabel: str = None, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None: """ - Extract text content from ExtractedContent objects or other content objects. + Centralized function to update workflow statistics in database and running workflow. Args: - content_objects: List of ExtractedContent objects or other content objects + eventLabel: Label for the event (e.g., "userinput", "taskplan", "action", "aicall