# promptFactory.py # Enhanced prompt factory with reusable functions import json import logging import importlib import pkgutil import inspect from typing import Any, Dict, List from modules.datamodels.datamodelWorkflow import TaskContext, ReviewContext, DocumentExchange from modules.datamodels.datamodelChat import ChatDocument from modules.services.serviceGeneration.subDocumentUtility import getFileExtension from modules.workflows.methods.methodBase import MethodBase # Set up logger logger = logging.getLogger(__name__) # Global methods catalog - moved from serviceCenter methods = {} def discoverMethods(serviceCenter): """Dynamically discover all method classes and their actions in modules methods package""" try: # Import the methods package methodsPackage = importlib.import_module('modules.workflows.methods') # Discover all modules in the package for _, name, isPkg in pkgutil.iter_modules(methodsPackage.__path__): if not isPkg and name.startswith('method'): try: # Import the module module = importlib.import_module(f'modules.workflows.methods.{name}') # Find all classes in the module that inherit from MethodBase for itemName, item in inspect.getmembers(module): if (inspect.isclass(item) and issubclass(item, MethodBase) and item != MethodBase): # Instantiate the method methodInstance = item(serviceCenter) # Use the actions property from MethodBase which handles @action decorator actions = methodInstance.actions # Create method info methodInfo = { 'instance': methodInstance, 'actions': actions, 'description': item.__doc__ or f"Method {itemName}" } # Store the method with full class name methods[itemName] = methodInfo # Also store with short name for action executor access shortName = itemName.replace('Method', '').lower() methods[shortName] = methodInfo logger.info(f"Discovered method {itemName} (short: {shortName}) with {len(actions)} actions") except Exception as e: logger.error(f"Error discovering method {name}: {str(e)}") continue logger.info(f"Discovered {len(methods)} method entries total") except Exception as e: logger.error(f"Error discovering methods: {str(e)}") def getMethodsList(serviceCenter): """Get a list of available methods with their signatures""" if not methods: discoverMethods(serviceCenter) methodsList = [] for methodName, methodInfo in methods.items(): methodDescription = methodInfo['description'] actionsList = [] for actionName, actionInfo in methodInfo['actions'].items(): actionDescription = actionInfo['description'] parameters = actionInfo['parameters'] # Build parameter signature paramSig = [] for paramName, paramInfo in parameters.items(): paramType = paramInfo['type'] paramRequired = paramInfo['required'] paramDefault = paramInfo['default'] if paramRequired: paramSig.append(f"{paramName}: {paramType}") else: defaultStr = f" = {paramDefault}" if paramDefault is not None else " = None" paramSig.append(f"{paramName}: {paramType}{defaultStr}") paramSignature = f"({', '.join(paramSig)})" if paramSig else "()" actionsList.append(f"- {actionName}{paramSignature}: {actionDescription}") actionsStr = "\n".join(actionsList) methodsList.append(f"**{methodName}**: {methodDescription}\n{actionsStr}") return "\n\n".join(methodsList) # Reusable prompt element functions def getAvailableDocuments(context: Any) -> str: """Get available documents for prompt context""" try: if not context or not hasattr(context, 'available_documents') or not context.available_documents: return "No documents available" documents = context.available_documents if not isinstance(documents, list): return "No documents available" docList = [] for i, doc in enumerate(documents, 1): if isinstance(doc, ChatDocument): docInfo = f"{i}. **{doc.fileName}**" if hasattr(doc, 'mimeType') and doc.mimeType: docInfo += f" ({doc.mimeType})" if hasattr(doc, 'size') and doc.size: docInfo += f" - {doc.size} bytes" docList.append(docInfo) elif isinstance(doc, dict): docInfo = f"{i}. **{doc.get('fileName', 'Unknown')}**" if doc.get('mimeType'): docInfo += f" ({doc['mimeType']})" if doc.get('size'): docInfo += f" - {doc['size']} bytes" docList.append(docInfo) else: docList.append(f"{i}. {str(doc)}") return "\n".join(docList) if docList else "No documents available" except Exception as e: logger.error(f"Error getting available documents: {str(e)}") return "Error retrieving documents" def getWorkflowHistory(services, context: Any) -> str: """Get workflow history for prompt context""" try: if not context or not hasattr(context, 'workflow_id'): return "No workflow history available" workflowId = context.workflow_id if not workflowId: return "No workflow history available" # Get workflow messages messages = services.interfaceDbChat.getWorkflowMessages(workflowId) if not messages: return "No workflow history available" # Filter for relevant messages (last 10) recentMessages = messages[-10:] if len(messages) > 10 else messages historyList = [] for msg in recentMessages: if hasattr(msg, 'role') and hasattr(msg, 'message'): role = "User" if msg.role == "user" else "Assistant" message = msg.message[:200] + "..." if len(msg.message) > 200 else msg.message historyList.append(f"**{role}**: {message}") return "\n".join(historyList) if historyList else "No workflow history available" except Exception as e: logger.error(f"Error getting workflow history: {str(e)}") return "Error retrieving workflow history" def getAvailableMethods(services) -> str: """Get available methods for prompt context""" try: if not methods: discoverMethods(services) return getMethodsList(services) except Exception as e: logger.error(f"Error getting available methods: {str(e)}") return "Error retrieving available methods" def getEnhancedDocumentContext(services) -> str: """Get enhanced document context with full metadata""" try: # Get all documents from the current workflow workflow = getattr(services, 'workflow', None) if not workflow or not hasattr(workflow, 'id'): return "No workflow context available" # Get workflow documents documents = services.interfaceDbChat.getWorkflowDocuments(workflow.id) if not documents: return "No documents available" docList = [] for i, doc in enumerate(documents, 1): if isinstance(doc, ChatDocument): docInfo = f"{i}. **{doc.fileName}**" if hasattr(doc, 'mimeType') and doc.mimeType: docInfo += f" ({doc.mimeType})" if hasattr(doc, 'size') and doc.size: docInfo += f" - {doc.size} bytes" if hasattr(doc, 'created') and doc.created: docInfo += f" - Created: {doc.created}" if hasattr(doc, 'modified') and doc.modified: docInfo += f" - Modified: {doc.modified}" docList.append(docInfo) elif isinstance(doc, dict): docInfo = f"{i}. **{doc.get('fileName', 'Unknown')}**" if doc.get('mimeType'): docInfo += f" ({doc['mimeType']})" if doc.get('size'): docInfo += f" - {doc['size']} bytes" if doc.get('created'): docInfo += f" - Created: {doc['created']}" if doc.get('modified'): docInfo += f" - Modified: {doc['modified']}" docList.append(docInfo) else: docList.append(f"{i}. {str(doc)}") return "\n".join(docList) if docList else "No documents available" except Exception as e: logger.error(f"Error getting enhanced document context: {str(e)}") return "Error retrieving document context" def getConnectionReferenceList(services) -> List[str]: """Get list of available connections""" try: # Get connections from services if hasattr(services, 'connection') and hasattr(services.connection, 'getConnections'): connections = services.connection.getConnections() if connections: return [f"{conn.get('name', 'Unknown')} ({conn.get('type', 'Unknown')})" for conn in connections] return [] except Exception as e: logger.error(f"Error getting connection reference list: {str(e)}") return [] def getUserLanguage(services) -> str: """Get user language from services""" try: if hasattr(services, 'user') and hasattr(services.user, 'language'): return services.user.language or 'en' return 'en' except Exception as e: logger.error(f"Error getting user language: {str(e)}") return 'en' def getReviewContent(context: Any) -> str: """Get review content for prompt context""" try: if not context or not hasattr(context, 'observation'): return "No review content available" observation = context.observation if not isinstance(observation, dict): return "No review content available" reviewParts = [] # Add success status if 'success' in observation: reviewParts.append(f"Success: {observation['success']}") # Add documents count if 'documentsCount' in observation: reviewParts.append(f"Documents generated: {observation['documentsCount']}") # Add previews if 'previews' in observation and observation['previews']: reviewParts.append("Document previews:") for preview in observation['previews']: if isinstance(preview, dict): name = preview.get('name', 'Unknown') mimeType = preview.get('mimeType', 'Unknown') size = preview.get('contentSize', 'Unknown size') reviewParts.append(f" - {name} ({mimeType}) - {size}") # Add notes if 'notes' in observation and observation['notes']: reviewParts.append("Notes:") for note in observation['notes']: reviewParts.append(f" - {note}") return "\n".join(reviewParts) if reviewParts else "No review content available" except Exception as e: logger.error(f"Error getting review content: {str(e)}") return "Error retrieving review content" def getPreviousRoundContext(services, context: Any) -> str: """Get previous round context for prompt""" try: if not context or not hasattr(context, 'workflow_id'): return "No previous round context available" workflowId = context.workflow_id if not workflowId: return "No previous round context available" # Get previous round results previousResults = getattr(context, 'previous_results', []) if not previousResults: return "No previous round context available" contextList = [] for i, result in enumerate(previousResults, 1): if hasattr(result, 'success') and hasattr(result, 'resultLabel'): status = "Success" if result.success else "Failed" contextList.append(f"{i}. {result.resultLabel} - {status}") elif isinstance(result, dict): status = "Success" if result.get('success', False) else "Failed" label = result.get('resultLabel', 'Unknown') contextList.append(f"{i}. {label} - {status}") else: contextList.append(f"{i}. {str(result)}") return "\n".join(contextList) if contextList else "No previous round context available" except Exception as e: logger.error(f"Error getting previous round context: {str(e)}") return "Error retrieving previous round context"