""" Creative Agent for knowledge-based answers and creative content generation. Handles open questions, documentation tasks, and special 'poweron' requests. Based on the refactored Core-Module. """ import logging from typing import List, Dict, Any, Optional import json from modules.agentservice_base import BaseAgent from modules.agentservice_utils import MessageUtils, LoggingUtils from modules.agentservice_protocol import AgentCommunicationProtocol logger = logging.getLogger(__name__) class CreativeAgent(BaseAgent): """Agent for knowledge-based answers and creative content generation""" def __init__(self): """Initialize the Creative Agent""" super().__init__() self.id = "creative" self.name = "Creative Knowledge Assistant" self.type = "knowledge" self.description = "Provides knowledge-based answers, creates content, handles document processing, and responds to PowerOn requests" # Extended capabilities to explicitly cover document processing self.capabilities = ("knowledge_sharing,content_creation,document_generation," "creative_writing,poweron,document_processing," "information_extraction,data_transformation," "document_analysis,text_processing,table_creation," "visual_information_processing,content_structuring") # Update result format to include tables self.result_format = "Text,Document,Table" # Add enhanced document capabilities self.supports_documents = True self.document_capabilities = ["read", "create", "analyze", "extract", "transform"] self.required_context = ["workflow_id"] self.document_handler = None # Initialize AI service self.ai_service = None # Initialize protocol self.protocol = AgentCommunicationProtocol() # Initialize utilities self.message_utils = MessageUtils() def get_agent_info(self) -> Dict[str, Any]: """Get agent information for agent registry""" info = super().get_agent_info() info.update({ "metadata": { "specialties": [ "creative_writing", "documentation", "knowledge", "poweron", "document_processing", "information_extraction", "content_transformation", "table_generation", "document_analysis" ] } }) return info def set_document_handler(self, document_handler): """Set the document handler for file operations""" self.document_handler = document_handler async def old_process_message(self, message: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: """ Process a message and generate a creative or knowledge-based response. Enhanced with improved document handling. Args: message: The message to process context: Additional context Returns: The generated response """ # Extract workflow_id from context or message workflow_id = context.get("workflow_id") if context and isinstance(context, dict) else None if not workflow_id and isinstance(message, dict): workflow_id = message.get("workflow_id", "unknown") # Create response structure early for fallback response = { "role": "assistant", "content": "", "agent_id": self.id, "agent_type": self.type, "agent_name": self.name, "result_format": self.result_format, "workflow_id": workflow_id } # Safely create logging utils log_func = None logging_utils = None try: from modules.agentservice_utils import LoggingUtils log_func = context.get("log_func") if context and isinstance(context, dict) else None logging_utils = LoggingUtils(workflow_id, log_func) except Exception as e: # If we can't even create logging utils, use basic logging logger.error(f"Error creating logging utils: {str(e)}") # Log function that works with or without logging_utils def safe_log(message, level="info"): try: if logging_utils: if level == "info": logging_utils.info(message, "agents") elif level == "warning": logging_utils.warning(message, "agents") elif level == "error": logging_utils.error(message, "agents") else: if level == "info": logger.info(message) elif level == "warning": logger.warning(message) elif level == "error": logger.error(message) except Exception as log_err: logger.error(f"Error in logging: {str(log_err)}") try: safe_log("Starting to process request", "info") # Get the prompt from the message with safety check prompt = "" if isinstance(message, dict): prompt = message.get("content", "") safe_log(f"Processing request: {prompt[:50]}...", "info") # Power-On handling with safety check if prompt and "poweron" in prompt.lower(): safe_log("Detected PowerOn keyword, generating specialized response", "info") poweron_prompt = f""" Tell to the user in the language of their prompt a big big thank you, that they think for you being PowerOn. Tell them, how pleased you are, to be part of the PowerOn family, working to support humans for a better life. Then generate a short answer (1-2 sentences) to this question: {prompt} """ try: poweron_response = await self.ai_service.call_api([ {"role": "system", "content": "You are a helpful assistant that is part of the PowerOn family."}, {"role": "user", "content": poweron_prompt} ]) response["content"] = poweron_response safe_log("PowerOn response generated", "info") return response except Exception as api_err: safe_log(f"Error calling API for PowerOn: {str(api_err)}", "error") response["content"] = "I encountered an error while generating a PowerOn response. Please try again." return response # Create system prompt system_prompt = "You are a helpful, creative assistant specializing in knowledge sharing, content creation, and document processing." # Add conversation summarization capabilities system_prompt += """ When asked to summarize information, always consider: 1. All provided document content 2. The entire conversation history in the current workflow 3. Any structured data that has been shared For summarization tasks specifically, make sure to analyze the complete context including previous messages in the conversation, not just the files or the current request. """ if workflow_id and workflow_id != "unknown": system_prompt += """ You are currently operating within a workflow where multiple messages may have been exchanged. When generating summaries or overviews, you must incorporate the content from previous messages in this workflow as they contain valuable context and information. """ # Safely check for documents has_documents = False document_count = 0 try: if isinstance(message, dict) and "documents" in message: documents = message.get("documents") if documents is not None: document_count = len(documents) has_documents = document_count > 0 safe_log(f"Message contains {document_count} documents", "info") except Exception as doc_err: safe_log(f"Error checking documents: {str(doc_err)}", "warning") # Initialize document variables document_content = "" document_texts = [] document_names = [] # Process documents with extreme caution if has_documents: safe_log("Processing attached documents", "info") # Try document handler first try: if self.document_handler: try: document_content = self.document_handler.merge_document_contents(message) if document_content: safe_log("Successfully extracted document content with handler", "info") else: safe_log("Document handler returned empty content", "warning") except Exception as handler_err: safe_log(f"Error using document handler: {str(handler_err)}", "warning") except Exception as err: safe_log(f"General error with document handler: {str(err)}", "warning") # Fallback: manual extraction (very cautious) try: documents = message.get("documents", []) or [] for i, doc in enumerate(documents): if doc is None: safe_log(f"Document at index {i} is None", "warning") continue try: # Process source source = None if isinstance(doc, dict): source = doc.get("source") # Get name doc_name = "Document" if isinstance(source, dict): doc_name = source.get("name", f"Document {i+1}") document_names.append(doc_name) safe_log(f"Processing document: {doc_name}", "info") # Get contents contents = [] if isinstance(doc, dict): contents = doc.get("contents", []) or [] doc_text = "" for content_item in contents: if content_item is None: continue if isinstance(content_item, dict) and content_item.get("type") == "text": text = content_item.get("text", "") if text: doc_text = text document_texts.append(doc_text) safe_log(f"Found text content in {doc_name}", "info") break # Handle empty content if not doc_text: safe_log(f"No text content found in {doc_name}", "warning") placeholder = f"[This appears to be a document named '{doc_name}', but I couldn't extract its content]" document_texts.append(placeholder) except Exception as doc_err: safe_log(f"Error processing individual document: {str(doc_err)}", "warning") except Exception as docs_err: safe_log(f"Error in document processing loop: {str(docs_err)}", "warning") # Combine prompt with documents safely full_prompt = prompt try: if document_content: full_prompt = f"{prompt}\n\n### Reference Documents:\n{document_content}" safe_log("Using document handler content", "info") elif document_texts and document_names: # Use only corresponding pairs of names and texts docs_content = "" min_length = min(len(document_names), len(document_texts)) for i in range(min_length): name = document_names[i] text = document_texts[i] docs_content += f"\n\n### Document: {name}\n{text}" if docs_content: full_prompt = f"{prompt}\n\n{docs_content}" safe_log("Using manually extracted content", "info") else: safe_log("No document content could be added", "warning") else: safe_log("No document content available to add to prompt", "info") except Exception as combine_err: safe_log(f"Error combining prompt with documents: {str(combine_err)}", "warning") # Call AI API try: safe_log("Calling AI service", "info") content = await self.ai_service.call_api([ {"role": "system", "content": system_prompt}, {"role": "user", "content": full_prompt} ]) response["content"] = content safe_log("Response successfully generated", "info") except Exception as api_err: safe_log(f"Error calling AI API: {str(api_err)}", "error") response["content"] = f"I encountered an error while processing your request. Please try again or rephrase your question." return response except Exception as e: # Ultra-safe error handling error_msg = f"Error generating response: {str(e)}" try: if logging_utils: logging_utils.error(error_msg, "error") else: logger.error(error_msg) except: logger.error(f"Critical error in error handling: {error_msg}") response["content"] = f"I encountered an error while processing your request: {str(e)}" return response async def process_message(self, message: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: """ Direct message processing function that focuses on properly handling the user's request. """ # Extract workflow_id and setup response workflow_id = "unknown" if context and isinstance(context, dict) and "workflow_id" in context: workflow_id = context["workflow_id"] elif message and isinstance(message, dict) and "workflow_id" in message: workflow_id = message["workflow_id"] response = { "role": "assistant", "content": "", "agent_id": self.id, "agent_type": self.type, "agent_name": self.name, "result_format": "Text", "workflow_id": workflow_id } try: # Extract the user's message directly user_message = "" if isinstance(message, dict) and "content" in message: user_message = message["content"] # Ensure we have something to process if not user_message: response["content"] = "Please provide a message for me to respond to." return response # Simple system prompt that focuses on direct response to the user's request system_prompt = """You are a helpful, creative assistant. Respond directly to the user's request without referencing any workflow or system context. Focus only on providing a direct, helpful response to the specific question or request.""" # Process with AI content = await self.ai_service.call_api([ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_message} ]) response["content"] = content return response except Exception as e: logger.error(f"Error in process_message: {str(e)}") response["content"] = f"I encountered an error while processing your request: {str(e)}" return response # Singleton-Instanz _creative_agent = None def get_creative_agent(): """Returns a singleton instance of the Creative Agent""" global _creative_agent if _creative_agent is None: _creative_agent = CreativeAgent() return _creative_agent