From d1aac4099d1dbe684a17a8db2065bf4d7dfe15fc Mon Sep 17 00:00:00 2001 From: ValueOn AG
There was an error generating the documentation: {str(e)}
" - else: - content = f"Error in Documentation\n\nThere was an error generating the documentation: {str(e)}" - - return self.formatAgentDocumentOutput(outputLabel, content, contentType) - - -# Factory function for the Documentation agent -def getAgentDocumentation(): - """Returns an instance of the Documentation agent.""" - return AgentDocumentation() \ No newline at end of file diff --git a/modules/historic_data_agents/agentEmail.py b/modules/historic_data_agents/agentEmail.py deleted file mode 100644 index 6c6e2f5f..00000000 --- a/modules/historic_data_agents/agentEmail.py +++ /dev/null @@ -1,380 +0,0 @@ -""" -Email Agent Module. -Handles email-related tasks using Microsoft Graph API. -""" - -import logging -import json -from typing import Dict, Any, List, Optional, Tuple -import uuid -import os - -from modules.workflow.agentBase import AgentBase -from modules.interfaces.serviceChatModel import Task, ChatDocument, ChatContent - -logger = logging.getLogger(__name__) - -class AgentEmail(AgentBase): - """Agent for handling email-related tasks.""" - - def __init__(self): - """Initialize the email agent.""" - super().__init__() - self.name = "email" - self.label = "Email Agent" - self.description = "Handles email composition and sending using Microsoft Graph API" - self.capabilities = [ - "email_composition", - "email_draft_creation", - "email_template_generation" - ] - self.serviceBase = None - - def setDependencies(self, serviceBase=None): - """Set external dependencies for the agent.""" - self.serviceBase = serviceBase - - async def processTask(self, task: Task) -> Dict[str, Any]: - """ - Process an email-related task. - - Args: - task: Task object containing: - - prompt: Instructions for the agent - - inputDocuments: List of documents to process - - outputSpecifications: List of required output documents - - context: Additional context including workflow info - - Returns: - Dictionary containing: - - feedback: Text response explaining what was done - - documents: List of created documents - """ - try: - # Extract task information - prompt = task.prompt - inputDocuments = task.filesInput - outputSpecs = task.filesOutput - - # Check AI service - if not self.service.base: - return { - "feedback": "The Email agent requires an AI service to function.", - "documents": [] - } - - # Check if Microsoft connector is available - if not hasattr(self.service, 'msft'): - return { - "feedback": "Microsoft connector not available. Please ensure Microsoft integration is properly configured.", - "documents": [] - } - - # Get Microsoft token - token_data = self.service.msft.getMsftToken() - if not token_data: - # Create authentication trigger document - auth_doc = self._createFrontendAuthTriggerDocument() - return { - "feedback": "Microsoft authentication required. Please authenticate to continue.", - "documents": [auth_doc] - } - - # Extract document data from input - documentContents, attachments = self._processInputDocuments(inputDocuments) - - # Generate email subject and body using AI - emailTemplate = await self._generateEmailTemplate(prompt, documentContents) - - # Create HTML preview of the email - htmlPreview = self._createHtmlPreview(emailTemplate) - - # Attempt to create a draft email using Microsoft Graph API - draft_result = self.service.msft.createDraftEmail( - emailTemplate["recipient"], - emailTemplate["subject"], - emailTemplate["htmlBody"], - attachments - ) - - # Prepare output documents - documents = [] - - # Process output specifications - for spec in outputSpecs: - label = spec.get("label", "") - description = spec.get("description", "") - - if label.endswith(".html"): - # Create the HTML template file - templateDoc = self.formatAgentDocumentOutput( - label, - emailTemplate["htmlBody"], # Use the actual HTML body, not the preview - "text/html" - ) - documents.append(templateDoc) - elif label.endswith(".json"): - # Create JSON template if requested - templateJson = json.dumps(emailTemplate, indent=2) - templateDoc = self.formatAgentDocumentOutput( - label, - templateJson, - "application/json" - ) - documents.append(templateDoc) - else: - # Default to preview for other cases - previewDoc = self.formatAgentDocumentOutput( - label, - htmlPreview, - "text/html" - ) - documents.append(previewDoc) - - # Prepare feedback message - if draft_result: - feedback = f"Email draft created successfully for {emailTemplate.get('recipient')}. The subject is: '{emailTemplate['subject']}'" - if attachments: - feedback += f" with {len(attachments)} attachment(s)" - feedback += ". You can open and edit it in your Outlook draft folder." - else: - feedback = "Email template created but could not save as draft. HTML preview and template are available as documents." - - return { - "feedback": feedback, - "documents": documents - } - - except Exception as e: - logger.error(f"Error in email agent: {str(e)}") - return { - "feedback": f"Error processing email task: {str(e)}", - "documents": [] - } - - def _createFrontendAuthTriggerDocument(self) -> ChatDocument: - """Create a document that triggers Microsoft authentication in the frontend.""" - return ChatDocument( - id=str(uuid.uuid4()), - name="microsoft_auth", - ext="html", - data=""" -Please click the button below to authenticate with Microsoft:
- -Please click the button below to authenticate with Microsoft:
- -This email is regarding your request: {prompt}
" - } - - except Exception as e: - logger.warning(f"Error generating email template: {str(e)}") - return { - "recipient": "recipient@example.com", - "subject": "Information Regarding Your Request", - "plainBody": f"This email is regarding your request: {prompt}", - "htmlBody": f"This email is regarding your request: {prompt}
" - } - - def _createHtmlPreview(self, emailTemplate: Dict[str, Any]) -> str: - """ - Create an HTML preview of the email template. - - Args: - emailTemplate: Email template dictionary - - Returns: - HTML string for preview - """ - html = f""" - - - - -No content
')} -Please click the button below to authenticate with Microsoft:
- -An error occurred: {str(e)}
" - else: - content = f"WEB RESEARCH ERROR\n\nAn error occurred: {str(e)}" - - return self.formatAgentDocumentOutput(outputLabel, content, contentType) - - async def _createJsonDocument(self, prompt: str, results: List[Dict[str, Any]], - researchPlan: Dict[str, Any], outputLabel: str) -> Dict[str, Any]: - """ - Create a JSON document from research results. - - Args: - prompt: Original research prompt - results: Research results - researchPlan: Research plan - outputLabel: Output filename - - Returns: - Document object - """ - try: - # Create structured data - sourcesData = [] - for result in results: - sourcesData.append({ - "title": result.get("title", "Untitled"), - "url": result.get("url", ""), - "summary": result.get("summary", ""), - "snippet": result.get("snippet", ""), - "sourceType": result.get("sourceType", "") - }) - - # Create metadata - metadata = { - "query": prompt, - "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), - "researchQuestions": researchPlan.get("researchQuestions", []), - "searchTerms": researchPlan.get("searchTerms", []) - } - - # Compile complete report object - jsonContent = { - "metadata": metadata, - "summary": researchPlan.get("feedback", "Web research results"), - "sources": sourcesData - } - - # Convert to JSON string - content = json.dumps(jsonContent, indent=2) - - return self.formatAgentDocumentOutput(outputLabel, content, "application/json") - - except Exception as e: - logger.error(f"Error creating JSON document: {str(e)}") - return self.formatAgentDocumentOutput(outputLabel, json.dumps({"error": str(e)}), "application/json") - - async def _createCsvDocument(self, results: List[Dict[str, Any]], outputLabel: str) -> Dict[str, Any]: - """ - Create a CSV document from research results. - - Args: - results: Research results - outputLabel: Output filename - - Returns: - Document object - """ - try: - # Create CSV header - csvLines = ["Title,URL,Source Type,Snippet"] - - # Add results - for result in results: - # Escape CSV fields - title = result.get("title", "").replace('"', '""') - url = result.get("url", "").replace('"', '""') - sourceType = result.get("sourceType", "").replace('"', '""') - snippet = result.get("snippet", "").replace('"', '""') - - csvLines.append(f'"{title}","{url}","{sourceType}","{snippet}"') - - # Combine into CSV content - content = "\n".join(csvLines) - - return self.formatAgentDocumentOutput(outputLabel, content, "text/csv") - - except Exception as e: - logger.error(f"Error creating CSV document: {str(e)}") - return self.formatAgentDocumentOutput(outputLabel, "Error,Error\nFailed to create CSV,{0}".format(str(e)), "text/csv") - - def _determineFormatType(self, outputLabel: str) -> str: - """ - Determine the format type based on the filename. - - Args: - outputLabel: Output filename - - Returns: - Format type (markdown, html, text, json, csv) - """ - outputLabelLower = outputLabel.lower() - - if outputLabelLower.endswith(".md"): - return "markdown" - elif outputLabelLower.endswith(".html"): - return "html" - elif outputLabelLower.endswith(".txt"): - return "text" - elif outputLabelLower.endswith(".json"): - return "json" - elif outputLabelLower.endswith(".csv"): - return "csv" - else: - # Default to markdown - return "markdown" - - def _searchWeb(self, query: str) -> List[Dict[str, str]]: - """ - Conduct a web search using SerpAPI and return the results. - - Args: - query: The search query - - Returns: - List of search results - """ - if not self.srcApikey: - return [] - - # Get user language from serviceBase if available - userLanguage = "en" # Default language - if self.service.base.userLanguage: - userLanguage = self.service.base.userLanguage - - try: - # Format the search request for SerpAPI - params = { - "engine": self.srcEngine, - "q": query, - "api_key": self.srcApikey, - "num": self.maxResults, # Number of results to return - "hl": userLanguage # Identified user language - } - - # Make the API request - response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout) - response.raise_for_status() - - # Parse JSON response - search_results = response.json() - - # Extract organic results - results = [] - - if "organic_results" in search_results: - for result in search_results["organic_results"][:self.maxResults]: - # Extract title - title = result.get("title", "No title") - - # Extract URL - url = result.get("link", "No URL") - - # Extract snippet - snippet = result.get("snippet", "No description") - - # Get actual page content - try: - targetPageSoup = self._readUrl(url) - content = self._extractMainContent(targetPageSoup) - except Exception as e: - logger.warning(f"Error extracting content from {url}: {str(e)}") - content = f"Error extracting content: {str(e)}" - - results.append({ - 'title': title, - 'url': url, - 'snippet': snippet, - 'data': content - }) - - # Limit number of results - if len(results) >= self.maxResults: - break - else: - logger.warning(f"No organic results found in SerpAPI response for: {query}") - - return results - - except Exception as e: - logger.error(f"Error searching with SerpAPI for {query}: {str(e)}") - return [] - - def _readUrl(self, url: str) -> BeautifulSoup: - """ - Read a URL and return a BeautifulSoup parser for the content. - - Args: - url: The URL to read - - Returns: - BeautifulSoup object with the content or None on errors - """ - if not url or not url.startswith(('http://', 'https://')): - return None - - headers = { - 'User-Agent': self.userAgent, - 'Accept': 'text/html,application/xhtml+xml,application/xml', - 'Accept-Language': 'en-US,en;q=0.9', - } - - try: - # Initial request - response = requests.get(url, headers=headers, timeout=self.timeout) - - # Handling for status 202 - if response.status_code == 202: - # Retry with backoff - backoffTimes = [0.5, 1.0, 2.0, 5.0] - - for waitTime in backoffTimes: - time.sleep(waitTime) - response = requests.get(url, headers=headers, timeout=self.timeout) - - if response.status_code != 202: - break - - # Raise for error status codes - response.raise_for_status() - - # Parse HTML - return BeautifulSoup(response.text, 'html.parser') - - except Exception as e: - logger.error(f"Error reading URL {url}: {str(e)}") - return None - - def _extractTitle(self, soup: BeautifulSoup, url: str) -> str: - """ - Extract the title from a webpage. - - Args: - soup: BeautifulSoup object of the webpage - url: URL of the webpage - - Returns: - Extracted title - """ - if not soup: - return f"Error with {url}" - - # Extract title from title tag - titleTag = soup.find('title') - title = titleTag.text.strip() if titleTag else "No title" - - # Alternative: Also look for h1 tags if title tag is missing - if title == "No title": - h1Tag = soup.find('h1') - if h1Tag: - title = h1Tag.text.strip() - - return title - - def _extractMainContent(self, soup: BeautifulSoup, maxChars: int = 10000) -> str: - """ - Extract the main content from an HTML page. - - Args: - soup: BeautifulSoup object of the webpage - maxChars: Maximum number of characters - - Returns: - Extracted main content as a string - """ - if not soup: - return "" - - # Try to find main content elements in priority order - mainContent = None - for selector in ['main', 'article', '#content', '.content', '#main', '.main']: - content = soup.select_one(selector) - if content: - mainContent = content - break - - # If no main content found, use the body - if not mainContent: - mainContent = soup.find('body') or soup - - # Remove script, style, nav, footer elements that don't contribute to main content - for element in mainContent.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'): - element.extract() - - # Extract text content - textContent = mainContent.get_text(separator=' ', strip=True) - - # Limit to maxChars - return textContent[:maxChars] - - def _limitText(self, text: str, maxChars: int = 10000) -> str: - """ - Limit text to a maximum number of characters. - - Args: - text: Input text - maxChars: Maximum number of characters - - Returns: - Limited text - """ - if not text: - return "" - - # If text is already under the limit, return unchanged - if len(text) <= maxChars: - return text - - # Otherwise limit text to maxChars - return text[:maxChars] + "... [Content truncated due to length]" - - -# Factory function for the Webcrawler agent -def getAgentWebcrawler(): - """Returns an instance of the Webcrawler agent.""" - return AgentWebcrawler() \ No newline at end of file diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index 5bb48da3..c29fd70e 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -6,7 +6,7 @@ Uses the JSON connector for data access with added language support. import os import logging import uuid -from datetime import datetime +from datetime import datetime, UTC from typing import Dict, Any, List, Optional, Union import asyncio @@ -327,6 +327,11 @@ class ChatObjects: publishedAt=createdMessage.get("publishedAt", self._getCurrentTimestamp()), stats=ChatStat(**createdMessage.get("stats", {})) if createdMessage.get("stats") else None ) + + # Update workflow stats for message creation (estimate bytes for message) + message_size = len(createdMessage.get("message", "")) + sum(len(doc.get("filename", "")) for doc in createdMessage.get("documents", [])) + self.updateWorkflowStats(workflowId, bytesSent=0, bytesReceived=message_size) + except Exception as e: logger.error(f"Error creating workflow message: {str(e)}") return None @@ -535,6 +540,64 @@ class ChatObjects: # Get logs for this workflow return [ChatLog(**log) for log in self.db.getRecordset("workflowLogs", recordFilter={"workflowId": workflowId})] + def updateWorkflowStats(self, workflowId: str, bytesSent: int = 0, bytesReceived: int = 0) -> bool: + """Updates workflow statistics during execution with incremental values.""" + try: + # Get current workflow + workflow = self.getWorkflow(workflowId) + if not workflow: + logger.error(f"Workflow {workflowId} not found for stats update") + return False + + if not self._canModify("workflows", workflowId): + logger.error(f"No permission to update workflow {workflowId} stats") + return False + + # Get current stats + currentStats = workflow.stats.dict() if workflow.stats else { + "bytesSent": 0, + "bytesReceived": 0, + "tokenCount": 0, + "processingTime": 0 + } + + # Calculate processing time from workflow start + workflow_start = datetime.fromisoformat(workflow.startedAt.replace('Z', '+00:00')) + current_time = datetime.now(UTC) + processing_time = (current_time - workflow_start).total_seconds() + + # Update stats with incremental values + currentStats["bytesSent"] = currentStats.get("bytesSent", 0) + bytesSent + currentStats["bytesReceived"] = currentStats.get("bytesReceived", 0) + bytesReceived + currentStats["tokenCount"] = currentStats["bytesSent"] + currentStats["bytesReceived"] + currentStats["processingTime"] = processing_time + + # Update workflow in database + self.db.recordModify("workflows", workflowId, { + "dataStats": currentStats + }) + + # Log to stats table + stats_record = { + "timestamp": self._getCurrentTimestamp(), + "workflowId": workflowId, + "bytesSent": bytesSent, + "bytesReceived": bytesReceived, + "tokenCount": bytesSent + bytesReceived, + "processingTime": processing_time + } + + # Create stats record in database + self.db.recordCreate("stats", stats_record) + + logger.debug(f"Updated workflow {workflowId} stats: {currentStats}") + logger.debug(f"Logged stats record: {stats_record}") + return True + + except Exception as e: + logger.error(f"Error updating workflow stats: {str(e)}") + return False + def createWorkflowLog(self, logData: Dict[str, Any]) -> ChatLog: """Creates a log entry for a workflow if user has access.""" # Check workflow access @@ -777,14 +840,7 @@ class ChatObjects: # Create workflow workflow = self.createWorkflow(workflowData) - # Add log entry - self.createWorkflowLog({ - "workflowId": workflow.id, - "message": "Workflow started", - "type": "info", - "status": "running", - "progress": 0 - }) + # Remove the 'Workflow started' log entry # Start workflow processing from modules.workflow.managerWorkflow import WorkflowManager diff --git a/modules/methods/methodCoder.py b/modules/methods/EXCLUDED_methodCoder.py similarity index 93% rename from modules/methods/methodCoder.py rename to modules/methods/EXCLUDED_methodCoder.py index d9cc5289..33d285a0 100644 --- a/modules/methods/methodCoder.py +++ b/modules/methods/EXCLUDED_methodCoder.py @@ -10,9 +10,9 @@ logger = logging.getLogger(__name__) class MethodCoder(MethodBase): """Coder method implementation for code operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the coder method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "coder" self.description = "Handle code operations like analysis, generation, and refactoring" @@ -87,7 +87,18 @@ class MethodCoder(MethodBase): ) # Extract text content from ExtractedContent objects - text_contents = self.service.extractTextFromContentObjects(all_code_content) + text_contents = [] + for content_obj in all_code_content: + if hasattr(content_obj, 'contents') and content_obj.contents: + # Extract text from ContentItem objects + for content_item in content_obj.contents: + if hasattr(content_item, 'data') and content_item.data: + text_contents.append(content_item.data) + elif isinstance(content_obj, str): + text_contents.append(content_obj) + else: + # Fallback: convert to string representation + text_contents.append(str(content_obj)) # Combine all extracted text content for analysis combined_content = "\n\n--- CODE SEPARATOR ---\n\n".join(text_contents) diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py index a1b437de..208f736d 100644 --- a/modules/methods/methodDocument.py +++ b/modules/methods/methodDocument.py @@ -8,7 +8,6 @@ from typing import Dict, Any, List, Optional import uuid from datetime import datetime, UTC -from modules.workflow.managerDocument import DocumentManager from modules.workflow.methodBase import MethodBase, ActionResult, action logger = logging.getLogger(__name__) @@ -16,12 +15,11 @@ logger = logging.getLogger(__name__) class MethodDocument(MethodBase): """Document method implementation for document operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the document method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "document" self.description = "Handle document operations like extraction and analysis" - self.documentManager = DocumentManager(serviceContainer) @action async def extract(self, parameters: Dict[str, Any]) -> ActionResult: @@ -94,7 +92,18 @@ class MethodDocument(MethodBase): ) # Extract text content from ExtractedContent objects - text_contents = self.service.extractTextFromContentObjects(all_extracted_content) + text_contents = [] + for content_obj in all_extracted_content: + if hasattr(content_obj, 'contents') and content_obj.contents: + # Extract text from ContentItem objects + for content_item in content_obj.contents: + if hasattr(content_item, 'data') and content_item.data: + text_contents.append(content_item.data) + elif isinstance(content_obj, str): + text_contents.append(content_obj) + else: + # Fallback: convert to string representation + text_contents.append(str(content_obj)) # Combine all extracted text content combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join(text_contents) diff --git a/modules/methods/methodOutlook.py b/modules/methods/methodOutlook.py index f681931e..fb226731 100644 --- a/modules/methods/methodOutlook.py +++ b/modules/methods/methodOutlook.py @@ -16,9 +16,9 @@ logger = logging.getLogger(__name__) class MethodOutlook(MethodBase): """Outlook method implementation for email operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the Outlook method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "outlook" self.description = "Handle Microsoft Outlook email operations" diff --git a/modules/methods/methodSharepoint.py b/modules/methods/methodSharepoint.py index cb36b57b..dbfc4c1f 100644 --- a/modules/methods/methodSharepoint.py +++ b/modules/methods/methodSharepoint.py @@ -16,8 +16,8 @@ logger = logging.getLogger(__name__) class MethodSharepoint(MethodBase): """SharePoint method implementation for document operations""" - def __init__(self, serviceContainer: Any): - super().__init__(serviceContainer) + def __init__(self, serviceCenter: Any): + super().__init__(serviceCenter) self.name = "sharepoint" self.description = "Handle Microsoft SharePoint document operations" diff --git a/modules/methods/methodWeb.py b/modules/methods/methodWeb.py index e993ab55..4602a5a0 100644 --- a/modules/methods/methodWeb.py +++ b/modules/methods/methodWeb.py @@ -19,9 +19,9 @@ logger = logging.getLogger(__name__) class MethodWeb(MethodBase): """Web method implementation for web operations""" - def __init__(self, serviceContainer: Any): + def __init__(self, serviceCenter: Any): """Initialize the web method""" - super().__init__(serviceContainer) + super().__init__(serviceCenter) self.name = "web" self.description = "Handle web operations like crawling and scraping" @@ -452,7 +452,7 @@ class MethodWeb(MethodBase): "query": query } else: - # Get user language from service container if available + # Get user language from service center if available userLanguage = "en" # Default language if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'): userLanguage = self.service.user.language diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py index de39a1c4..e01cfeb7 100644 --- a/modules/routes/routeWorkflows.py +++ b/modules/routes/routeWorkflows.py @@ -176,7 +176,7 @@ async def get_workflow_status( ) -> ChatWorkflow: """Get the current status of a workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Retrieve workflow @@ -208,7 +208,7 @@ async def get_workflow_logs( ) -> List[ChatLog]: """Get logs for a workflow with support for selective data transfer.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists @@ -251,7 +251,7 @@ async def get_workflow_messages( ) -> List[ChatMessage]: """Get messages for a workflow with support for selective data transfer.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists @@ -297,7 +297,7 @@ async def start_workflow( Corresponds to State 1 in the state machine documentation. """ try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Start or continue workflow using ChatObjects @@ -322,7 +322,7 @@ async def stop_workflow( ) -> ChatWorkflow: """Stops a running workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Stop workflow using ChatObjects @@ -347,7 +347,7 @@ async def delete_workflow( ) -> Dict[str, Any]: """Deletes a workflow and its associated data.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Get raw workflow data from database to check permissions @@ -402,7 +402,7 @@ async def delete_workflow_message( ) -> Dict[str, Any]: """Delete a message from a workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists @@ -453,7 +453,7 @@ async def delete_file_from_message( ) -> Dict[str, Any]: """Delete a file reference from a message in a workflow.""" try: - # Get service container + # Get service center interfaceChat = getServiceChat(currentUser) # Verify workflow exists diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py index 864a7dc9..470653d8 100644 --- a/modules/workflow/managerChat.py +++ b/modules/workflow/managerChat.py @@ -2,6 +2,7 @@ import asyncio import logging import uuid import json +import time from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC @@ -9,7 +10,7 @@ from modules.interfaces.interfaceAppModel import User from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow ) -from modules.workflow.serviceContainer import ServiceContainer +from modules.workflow.serviceCenter import ServiceCenter from modules.interfaces.interfaceChatObjects import ChatObjects logger = logging.getLogger(__name__) @@ -20,7 +21,7 @@ class ChatManager: def __init__(self, currentUser: User, chatInterface: ChatObjects): self.currentUser = currentUser self.chatInterface = chatInterface - self.service: ServiceContainer = None + self.service: ServiceCenter = None self.workflow: ChatWorkflow = None # Circuit breaker for AI calls @@ -37,7 +38,7 @@ class ChatManager: async def initialize(self, workflow: ChatWorkflow) -> None: """Initialize chat manager with workflow""" self.workflow = workflow - self.service = ServiceContainer(self.currentUser, self.workflow) + self.service = ServiceCenter(self.currentUser, self.workflow) # ===== WORKFLOW PHASES ===== @@ -119,6 +120,12 @@ class ChatManager: task_actions.append(task_action) logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}") + # Update stats for task validation (estimate bytes for action validation) + if task_actions: + # Calculate actual action size for stats + action_size = self.service.calculateObjectSize(task_actions) + self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size) + logger.info(f"Task action definition completed: {len(task_actions)} actions") return task_actions @@ -265,6 +272,7 @@ class ChatManager: async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: """Process file IDs and return ChatDocument objects""" documents = [] + for fileId in fileIds: try: # Ensure service is initialized @@ -290,6 +298,8 @@ class ChatManager: logger.warning(f"No file info found for file ID {fileId}") except Exception as e: logger.error(f"Error processing file ID {fileId}: {str(e)}") + + return documents def setUserLanguage(self, language: str) -> None: @@ -768,7 +778,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" 'documents_metadata': documents_metadata, 'actionId': action_result.get('actionId', ''), 'actionMethod': action_result.get('actionMethod', ''), - 'actionName': action_result.get('actionName', '') + 'actionName': action_result.get('actionName', ''), + 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none' } step_result_serializable['action_results'].append(serializable_action_result) @@ -787,6 +798,13 @@ INSTRUCTIONS: 4. Decide on next action: continue, retry, or fail 5. If retry, provide specific improvements needed +IMPORTANT NOTES: +- Actions can produce either text results OR documents (or both) +- Empty result_summary is acceptable if documents were produced (documents_count > 0) +- Focus on whether the action achieved its intended purpose, not just text output +- Document-based actions (like file extractions) often have empty text results but successful document outputs +- Check the 'success_indicator' field: 'documents' means success via document output, 'text_result' means success via text, 'none' means no output + REQUIRED JSON STRUCTURE: {{ "status": "success|retry|failed", @@ -829,7 +847,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]: """Execute a single action and return result with enhanced document processing""" try: - # Execute the actual method action using the service container + # Execute the actual method action using the service center result = await self.service.executeAction( methodName=action.execMethod, actionName=action.execAction, @@ -943,7 +961,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" message_data = { "workflowId": workflow.id, "role": "assistant", - "message": f"Executed {action.execMethod}.{action.execAction} successfully", + "message": f"Executed action {action.execMethod}.{action.execAction}", "status": "step", "sequenceNr": len(workflow.messages) + 1, "publishedAt": datetime.now(UTC).isoformat(), @@ -979,7 +997,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" file_size = len(str(doc_data)) mime_type = "application/octet-stream" - # Enhanced MIME type detection using service container + # Enhanced MIME type detection using service center if mime_type == "application/octet-stream": mime_type = self._detectMimeTypeFromContent(document_data, document_name) @@ -1045,7 +1063,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" def _detectMimeTypeFromContent(self, content: Any, filename: str) -> str: """ - Detect MIME type from content and filename using service container. + Detect MIME type from content and filename using service center. Only returns a detected MIME type if it's better than application/octet-stream. Args: @@ -1065,7 +1083,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" else: file_bytes = str(content).encode('utf-8') - # Use service container's MIME type detection + # Use service center's MIME type detection detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) if detected_mime_type != "application/octet-stream": return detected_mime_type @@ -1076,7 +1094,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" def _detectMimeTypeFromDocument(self, document: Any, filename: str) -> str: """ - Detect MIME type from document object using service container. + Detect MIME type from document object using service center. Only returns a detected MIME type if it's better than application/octet-stream. Args: @@ -1094,7 +1112,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" else: file_bytes = str(content).encode('utf-8') - # Use service container's MIME type detection + # Use service center's MIME type detection detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) if detected_mime_type != "application/octet-stream": return detected_mime_type @@ -1222,8 +1240,11 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" action_results = review_context.get('action_results', []) if action_results: # Check for common issues that warrant retry + # Only consider empty results a problem if there are no documents produced has_empty_results = any( - not result.get('result', '').strip() + not result.get('result', '').strip() and + not result.get('documents', []) and + not result.get('documents_metadata', []) for result in action_results if result.get('status') == 'completed' ) @@ -1417,7 +1438,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" assistant_messages = [msg for msg in workflow.messages if msg.role == 'assistant'] # Generate summary feedback - feedback = f"Workflow completed successfully.\n\n" + feedback = f"Workflow completed.\n\n" feedback += f"Processed {len(user_messages)} user inputs and generated {len(assistant_messages)} responses.\n" # Add final status @@ -1437,36 +1458,38 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" # ===== UNIFIED WORKFLOW EXECUTION ===== async def executeUnifiedWorkflow(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]: - """Execute workflow using the new unified phases with retry logic""" + """Execute a unified workflow with all phases""" try: logger.info(f"Starting unified workflow execution for workflow {workflow.id}") + start_time = time.time() - # Create user-friendly progress log - self.chatInterface.createWorkflowLog({ - "workflowId": workflow.id, - "message": "Starting workflow analysis and planning", - "type": "info", - "status": "running", - "progress": 5, - "agentName": "System" - }) + # Initialize chat manager with workflow + await self.initialize(workflow) + + # Process file IDs if provided + documents = [] + if hasattr(userInput, 'listFileId') and userInput.listFileId: + documents = await self.processFileIds(userInput.listFileId) + logger.info(f"Processed {len(documents)} documents") + + # Calculate and update user input stats + user_input_size = self.service.calculateUserInputSize(userInput) + self.service.updateWorkflowStats(eventLabel="userinput", bytesReceived=user_input_size) # Phase 1: High-Level Task Planning - logger.info("=== PHASE 1: HIGH-LEVEL TASK PLANNING ===") - task_plan = await self.planHighLevelTasks(userInput, workflow) - if not task_plan or not task_plan.get('tasks'): - logger.error("Failed to create task plan") - return { - 'status': 'failed', - 'error': 'Failed to create task plan', - 'phase': 'planning' - } + logger.info("--- PHASE 1: HIGH-LEVEL TASK PLANNING ---") + task_plan = await self.planHighLevelTasks(userInput.prompt, workflow) + + # Update stats for task planning + task_plan_size = self.service.calculateObjectSize(task_plan) + self.service.updateWorkflowStats(eventLabel="taskplan", bytesSent=task_plan_size) # Create user-friendly task plan log tasks_count = len(task_plan.get('tasks', [])) + task_descriptions = "\n".join([f"- {task.get('description', 'No description')}" for task in task_plan.get('tasks', [])]) self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Planning completed: {tasks_count} tasks identified", + "message": f"Planning completed: {tasks_count} tasks identified\n{task_descriptions}", "type": "info", "status": "running", "progress": 15, @@ -1598,22 +1621,29 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" logger.debug(f"TASK {i+1} ACTIONS CREATED: {json.dumps(task_actions_serializable, indent=2, ensure_ascii=False)}") # Phase 3: Execute Task Actions - logger.info(f"--- PHASE 3: EXECUTING ACTIONS FOR TASK {i+1} ---") + logger.info(f"--- PHASE 3: EXECUTING TASK {i+1} ACTIONS ---") action_results = await self.executeTaskActions(task_actions, workflow) + # Update stats for action execution + # Action stats are already handled by the service center during AI calls + # Create user-friendly action completion log with quality metrics successful_actions = sum(1 for result in action_results if result.get('status') == 'completed') total_actions = len(action_results) if total_actions > 0: - quality_percentage = (successful_actions / total_actions) * 100 + if successful_actions == total_actions: + log_type = "success" + elif successful_actions == 0: + log_type = "error" + else: + log_type = "warning" self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} actions completed: {successful_actions}/{total_actions} successful ({quality_percentage:.0f}% quality)", - "type": "success" if quality_percentage >= 80 else "warning" if quality_percentage >= 60 else "error", + "message": f"Successful actions: {successful_actions}/{total_actions}", + "type": log_type, "status": "running", - "progress": progress + 10, - "agentName": "System" + "progress": progress + 10 }) # Log action results (with metadata only) @@ -1653,6 +1683,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" logger.info(f"--- PHASE 4: REVIEWING TASK {i+1} COMPLETION ---") review_result = await self.reviewTaskCompletion(task_step, task_actions, action_results, workflow) + # Update stats for task review + # Task review stats are already handled by the service center during AI calls + # Create user-friendly review log with quality metrics quality_metrics = review_result.get('quality_metrics', {}) quality_score = quality_metrics.get('score', 0) @@ -1662,29 +1695,62 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" if review_status == 'success': self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} completed successfully (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)", + "message": f"🎯 Task completed successfully with quality score {quality_score} and confidence {confidence}", "type": "success", "status": "running", - "progress": progress + 20, - "agentName": "System" + "progress": progress + 20 }) elif review_status == 'retry': + # Extract improvement details + improvements = review_result.get('improvements', '') + reason = review_result.get('reason', '') + unmet_criteria = review_result.get('unmet_criteria', []) + + # Build detailed message + retry_details = [] + if reason: + retry_details.append(f"Reason: {reason}") + if improvements: + retry_details.append(f"Improvements: {improvements}") + if unmet_criteria: + retry_details.append(f"Missing criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}") + + retry_message = f"🔄 Task needs improvement" + if retry_details: + retry_message += f"\n{chr(10).join(retry_details)}" + self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} needs improvement (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)", + "message": retry_message, "type": "warning", "status": "running", - "progress": progress + 15, - "agentName": "System" + "progress": progress + 15 }) else: + # Extract failure details + reason = review_result.get('reason', '') + unmet_criteria = review_result.get('unmet_criteria', []) + missing_outputs = review_result.get('missing_outputs', []) + + # Build detailed failure message + failure_details = [] + if reason: + failure_details.append(f"Reason: {reason}") + if unmet_criteria: + failure_details.append(f"Unmet criteria: {', '.join(unmet_criteria[:3])}{'...' if len(unmet_criteria) > 3 else ''}") + if missing_outputs: + failure_details.append(f"Missing outputs: {', '.join(missing_outputs[:3])}{'...' if len(missing_outputs) > 3 else ''}") + + failure_message = f"❌ Task failed" + if failure_details: + failure_message += f"\n{chr(10).join(failure_details)}" + self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Task {i+1} failed (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)", + "message": failure_message, "type": "error", "status": "running", - "progress": progress + 15, - "agentName": "System" + "progress": progress + 15 }) # Log review result (with metadata only) @@ -1724,7 +1790,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" previous_review_feedback = review_result.get('improvements', '') retry_count += 1 - if retry_count >= max_retries: + if retry_count > max_retries: logger.error(f"Task {i+1} failed after {max_retries} retries") task_success = False else: @@ -1775,35 +1841,37 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" # Final workflow summary successful_tasks = sum(1 for result in workflow_results if result.get('task_success', False)) - total_tasks = len(workflow_results) + total_tasks = len(task_plan['tasks']) + + # Final workflow stats are already handled by the service center during AI calls + + # Calculate total processing time + total_processing_time = time.time() - start_time # Create final user-friendly completion log if successful_tasks == total_tasks: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Workflow completed successfully: {successful_tasks}/{total_tasks} tasks completed", + "message": f"🎉 Workflow completed ({successful_tasks}/{total_tasks} tasks)", "type": "success", "status": "completed", - "progress": 100, - "agentName": "System" + "progress": 100 }) elif successful_tasks > 0: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Workflow completed partially: {successful_tasks}/{total_tasks} tasks completed", + "message": f"⚠️ Workflow partially completed ({successful_tasks}/{total_tasks} tasks)", "type": "warning", "status": "completed", - "progress": 100, - "agentName": "System" + "progress": 100 }) else: self.chatInterface.createWorkflowLog({ "workflowId": workflow.id, - "message": f"Workflow failed: {successful_tasks}/{total_tasks} tasks completed", + "message": f"❌ Workflow failed ({successful_tasks}/{total_tasks} tasks)", "type": "error", "status": "failed", - "progress": 100, - "agentName": "System" + "progress": 100 }) # Create serializable workflow results (with metadata only) @@ -1836,7 +1904,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" 'documents_metadata': documents_metadata, 'actionId': action_result.get('actionId', ''), 'actionMethod': action_result.get('actionMethod', ''), - 'actionName': action_result.get('actionName', '') + 'actionName': action_result.get('actionName', ''), + 'success_indicator': 'documents' if len(documents_metadata) > 0 else 'text_result' if action_result.get('result', '').strip() else 'none' } action_results_metadata.append(action_result_metadata) diff --git a/modules/workflow/managerDocument.py b/modules/workflow/managerDocument.py deleted file mode 100644 index b1b8e709..00000000 --- a/modules/workflow/managerDocument.py +++ /dev/null @@ -1,73 +0,0 @@ -""" -Document Manager Module for handling document operations and content extraction. -""" - -import logging - -from modules.interfaces.interfaceChatModel import ( - ChatDocument, - ExtractedContent -) -from modules.workflow.processorDocument import DocumentProcessor - -logger = logging.getLogger(__name__) - -class DocumentManager: - """Manager for document operations and content extraction""" - - def __init__(self, serviceContainer): - self.service = serviceContainer - # Create processor with service container for AI calls - self._processor = DocumentProcessor(serviceContainer) - - async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: - """Extract content from ChatDocument using prompt""" - try: - # Extract file data from ChatDocument - if document.data: - fileData = document.data.encode('utf-8') if isinstance(document.data, str) else document.data - else: - # Try to get file data from service container if document has fileId - if hasattr(document, 'fileId') and document.fileId: - fileData = self.service.getFileData(document.fileId) - else: - logger.error(f"No file data available in document: {document}") - raise ValueError("No file data available in document") - - # Get filename and mime type from document - filename = document.filename if hasattr(document, 'filename') else "document" - mimeType = document.mimeType if hasattr(document, 'mimeType') else "application/octet-stream" - - # Process with processor - extractedContent = await self._processor.processFileData( - fileData=fileData, - filename=filename, - mimeType=mimeType, - base64Encoded=False, - prompt=prompt - ) - - # Update objectId to match document ID - extractedContent.objectId = document.id - extractedContent.objectType = "ChatDocument" - - return extractedContent - - except Exception as e: - logger.error(f"Error extracting from document: {str(e)}") - raise - - async def extractContentFromFileData(self, prompt: str, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, documentId: str = None) -> ExtractedContent: - """Extract content from file data directly using prompt""" - try: - return await self._processor.processFileData( - fileData=fileData, - filename=filename, - mimeType=mimeType, - base64Encoded=base64Encoded, - prompt=prompt, - documentId=documentId - ) - except Exception as e: - logger.error(f"Error extracting from file data: {str(e)}") - raise diff --git a/modules/workflow/methodBase.py b/modules/workflow/methodBase.py index fe109512..8f09cb52 100644 --- a/modules/workflow/methodBase.py +++ b/modules/workflow/methodBase.py @@ -20,9 +20,9 @@ def action(func): class MethodBase: """Base class for all methods""" - def __init__(self, serviceContainer: Any): - """Initialize method with service container""" - self.service = serviceContainer + def __init__(self, serviceCenter: Any): + """Initialize method with service center""" + self.service = serviceCenter self.name: str self.description: str self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py index 75929f86..323c6f7f 100644 --- a/modules/workflow/processorDocument.py +++ b/modules/workflow/processorDocument.py @@ -32,10 +32,10 @@ class FileProcessingError(Exception): class DocumentProcessor: """Processor for handling document operations and content extraction.""" - def __init__(self, serviceContainer=None): + def __init__(self, serviceCenter=None): """Initialize the document processor.""" self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None - self._serviceContainer = serviceContainer + self._serviceCenter = serviceCenter self.supportedTypes: Dict[str, Callable[[bytes, str, str], Awaitable[List[ContentItem]]]] = { 'text/plain': self._processText, @@ -136,7 +136,7 @@ class DocumentProcessor: # Detect content type if needed if mimeType == "application/octet-stream": - mimeType = self._serviceContainer.detectContentTypeFromData(fileData, filename) + mimeType = self._serviceCenter.detectContentTypeFromData(fileData, filename) # Process document based on type if mimeType not in self.supportedTypes: @@ -527,7 +527,7 @@ class DocumentProcessor: # chunk is already base64 encoded string from _processImage # Use the original prompt directly for images (no content embedding) logger.debug(f"Calling image AI service for MIME type: {mimeType}") - processedContent = await self._serviceContainer.callAiImageBasic(prompt, chunk, mimeType) + processedContent = await self._serviceCenter.callAiImageBasic(prompt, chunk, mimeType) else: # For text content, use text AI service # Neutralize content if neutralizer is enabled (only for text) @@ -548,7 +548,7 @@ class DocumentProcessor: """ logger.debug(f"Calling text AI service for MIME type: {mimeType}") - processedContent = await self._serviceContainer.callAiTextBasic(aiPrompt, contentToProcess) + processedContent = await self._serviceCenter.callAiTextBasic(aiPrompt, contentToProcess) chunkResults.append(processedContent) except Exception as aiError: diff --git a/modules/workflow/serviceContainer.py b/modules/workflow/serviceCenter.py similarity index 76% rename from modules/workflow/serviceContainer.py rename to modules/workflow/serviceCenter.py index ea2f87b1..d403aca9 100644 --- a/modules/workflow/serviceContainer.py +++ b/modules/workflow/serviceCenter.py @@ -8,14 +8,14 @@ from modules.interfaces.interfaceAppModel import User, UserConnection from modules.interfaces.interfaceChatModel import ( TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, - ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange + ChatStat, ChatLog, ChatMessage, ChatWorkflow, DocumentExchange, ExtractedContent ) from modules.interfaces.interfaceAiCalls import AiCalls from modules.interfaces.interfaceChatObjects import getInterface as getChatObjects from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects -from modules.workflow.managerDocument import DocumentManager +from modules.workflow.processorDocument import DocumentProcessor from modules.workflow.methodBase import MethodBase import uuid import base64 @@ -23,8 +23,8 @@ import hashlib logger = logging.getLogger(__name__) -class ServiceContainer: - """Service container that provides access to all services and their functions""" +class ServiceCenter: + """Service center that provides access to all services and their functions""" def __init__(self, currentUser: User, workflow: ChatWorkflow): # Core services @@ -39,7 +39,7 @@ class ServiceContainer: self.interfaceComponent = getComponentObjects(currentUser) self.interfaceApp = getAppObjects(currentUser) self.interfaceAiCalls = AiCalls() - self.documentManager = DocumentManager(self) + self.documentProcessor = DocumentProcessor(self) # Initialize methods catalog self.methods = {} @@ -115,7 +115,7 @@ class ServiceContainer: def detectContentTypeFromData(self, fileData: bytes, filename: str) -> str: """ Detect content type from file data and filename. - This method makes the MIME type detection function accessible through the service container. + This method makes the MIME type detection function accessible through the service center. Args: fileData: Raw file data as bytes @@ -263,17 +263,11 @@ class ServiceContainer: # ===== Functions ===== - def extractContent(self, prompt: str, document: ChatDocument) -> str: + def extractContent(self, prompt: str, document: ChatDocument) -> ExtractedContent: """Extract content from document using prompt""" - return self.documentManager.extractContentFromDocument(prompt, document) + return self.extractContentFromDocument(prompt, document) - async def extractContentFromFileData(self, prompt: str, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, documentId: str = None) -> str: - """Extract content from file data directly using prompt""" - extracted_content = await self.documentManager.extractContentFromFileData(prompt, fileData, filename, mimeType, base64Encoded, documentId) - # Convert ExtractedContent to string for backward compatibility - if hasattr(extracted_content, 'contents'): - return "\n".join([item.data for item in extracted_content.contents]) - return str(extracted_content) + def getMethodsCatalog(self) -> Dict[str, Any]: """Get catalog of available methods and their actions""" @@ -502,7 +496,7 @@ Instructions: Please provide a comprehensive summary of this conversation.""" # Get summary using AI - return await self.interfaceAiCalls.callAiTextBasic(prompt) + return await self.callAiTextBasic(prompt) except Exception as e: logger.error(f"Error summarizing chat: {str(e)}") @@ -535,27 +529,81 @@ Instructions: Please provide a clear summary of this message.""" # Get summary using AI - return await self.interfaceAiCalls.callAiTextBasic(prompt) + return await self.callAiTextBasic(prompt) except Exception as e: logger.error(f"Error summarizing message: {str(e)}") return f"Error summarizing message: {str(e)}" - def callAiTextBasic(self, prompt: str, context: str = None) -> str: + async def callAiTextBasic(self, prompt: str, context: str = None) -> str: """Basic text processing using OpenAI""" - return self.interfaceAiCalls.callAiTextBasic(prompt, context) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + if context: + prompt_size += self.calculateObjectSize(context) + + # Call AI + response = await self.interfaceAiCalls.callAiTextBasic(prompt, context) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.openai.text", bytesSent=prompt_size, bytesReceived=response_size) + + return response - def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: + async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str: """Advanced text processing using Anthropic""" - return self.interfaceAiCalls.callAiTextAdvanced(prompt, context) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + if context: + prompt_size += self.calculateObjectSize(context) + + # Call AI + response = await self.interfaceAiCalls.callAiTextAdvanced(prompt, context) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.anthropic.text", bytesSent=prompt_size, bytesReceived=response_size) + + return response - def callAiImageBasic(self, prompt: str, imageData: str, mimeType: str) -> str: + async def callAiImageBasic(self, prompt: str, imageData: str, mimeType: str) -> str: """Basic image processing using OpenAI""" - return self.interfaceAiCalls.callAiImageBasic(prompt, imageData, mimeType) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + prompt_size += self.calculateObjectSize(imageData) + + # Call AI + response = await self.interfaceAiCalls.callAiImageBasic(prompt, imageData, mimeType) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.openai.image", bytesSent=prompt_size, bytesReceived=response_size) + + return response - def callAiImageAdvanced(self, prompt: str, imageData: str, mimeType: str) -> str: + async def callAiImageAdvanced(self, prompt: str, imageData: str, mimeType: str) -> str: """Advanced image processing using Anthropic""" - return self.interfaceAiCalls.callAiImageAdvanced(prompt, imageData, mimeType) + # Calculate prompt size for stats + prompt_size = self.calculateObjectSize(prompt) + prompt_size += self.calculateObjectSize(imageData) + + # Call AI + response = await self.interfaceAiCalls.callAiImageAdvanced(prompt, imageData, mimeType) + + # Calculate response size for stats + response_size = self.calculateObjectSize(response) + + # Update stats + self.updateWorkflowStats(eventLabel="aicall.anthropic.image", bytesSent=prompt_size, bytesReceived=response_size) + + return response def getFileInfo(self, fileId: str) -> Dict[str, Any]: """Get file information""" @@ -575,6 +623,59 @@ Please provide a clear summary of this message.""" """Get file data by ID""" return self.interfaceComponent.getFileData(fileId) + async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent: + """Extract content from ChatDocument using prompt""" + try: + # Extract file data from ChatDocument + if document.data: + fileData = document.data.encode('utf-8') if isinstance(document.data, str) else document.data + else: + # Try to get file data from service center if document has fileId + if hasattr(document, 'fileId') and document.fileId: + fileData = self.getFileData(document.fileId) + else: + logger.error(f"No file data available in document: {document}") + raise ValueError("No file data available in document") + + # Get filename and mime type from document + filename = document.filename if hasattr(document, 'filename') else "document" + mimeType = document.mimeType if hasattr(document, 'mimeType') else "application/octet-stream" + + # Process with document processor directly + extractedContent = await self.documentProcessor.processFileData( + fileData=fileData, + filename=filename, + mimeType=mimeType, + base64Encoded=False, + prompt=prompt, + documentId=document.id + ) + + # Update objectId to match document ID + extractedContent.objectId = document.id + extractedContent.objectType = "ChatDocument" + + return extractedContent + + except Exception as e: + logger.error(f"Error extracting from document: {str(e)}") + raise + + async def extractContentFromFileData(self, prompt: str, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, documentId: str = None) -> ExtractedContent: + """Extract content from file data directly using prompt""" + try: + return await self.documentProcessor.processFileData( + prompt=prompt, + fileData=fileData, + filename=filename, + mimeType=mimeType, + base64Encoded=base64Encoded, + documentId=documentId + ) + except Exception as e: + logger.error(f"Error extracting from file data: {str(e)}") + raise + def createFile(self, fileName: str, mimeType: str, content: str, base64encoded: bool = False) -> str: """Create new file and return its ID""" # Convert content to bytes based on base64 flag @@ -613,29 +714,85 @@ Please provide a clear summary of this message.""" mimeType=mimeType ) - def extractTextFromContentObjects(self, content_objects: List[Any]) -> List[str]: + def updateWorkflowStats(self, eventLabel: str = None, bytesSent: int = 0, bytesReceived: int = 0, tokenCount: int = 0) -> None: """ - Extract text content from ExtractedContent objects or other content objects. + Centralized function to update workflow statistics in database and running workflow. Args: - content_objects: List of ExtractedContent objects or other content objects + eventLabel: Label for the event (e.g., "userinput", "taskplan", "action", "aicall