From bd0d964e93991dd3cda0c24d091f8b8aba195cb7 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Sun, 24 Aug 2025 17:03:39 +0200 Subject: [PATCH] cleaned up code, removed sessions as jwt used, functions alignment --- app.py | 4 +- modules/chat/documents/documentExtraction.py | 86 +-- modules/chat/documents/documentGeneration.py | 204 ++--- modules/chat/documents/documentUtility.py | 26 +- modules/chat/handling/executionState.py | 23 - modules/chat/handling/handlingTasks.py | 614 +++++++++++++-- modules/chat/handling/promptFactory.py | 461 ++++++------ modules/chat/managerChat.py | 18 +- modules/chat/serviceCenter.py | 708 ++++++++++++------ modules/connectors/connectorDbJson.py | 23 +- modules/interfaces/interfaceAppAccess.py | 68 +- modules/interfaces/interfaceAppModel.py | 48 -- modules/interfaces/interfaceAppObjects.py | 57 +- modules/interfaces/interfaceChatModel.py | 127 +++- modules/interfaces/interfaceChatObjects.py | 458 +---------- modules/interfaces/interfaceComponentModel.py | 8 +- .../interfaces/interfaceComponentObjects.py | 252 +++---- modules/methods/methodAi.py | 20 +- modules/methods/methodDocument.py | 54 +- modules/methods/methodOutlook.py | 24 +- modules/methods/methodSharepoint.py | 2 +- modules/routes/routeDataFiles.py | 22 +- modules/routes/routeSecurityLocal.py | 4 +- modules/security/auth.py | 116 +-- modules/security/tokenManager.py | 28 +- modules/shared/attributeUtils.py | 8 +- modules/shared/timezoneUtils.py | 112 --- notes/changelog.txt | 136 ++-- test_documentExtraction.py | 22 +- test_excel_processing.py | 4 +- test_pydantic_compat.py | 100 +++ tests/run_timestamp_tests.py | 218 ------ tests/test_api_timestamps.py | 155 ---- tests/test_timestamp_models.py | 385 ---------- tool_showUnusedFunctions.py | 210 ++++++ 35 files changed, 2067 insertions(+), 2738 deletions(-) create mode 100644 test_pydantic_compat.py delete mode 100644 tests/run_timestamp_tests.py delete mode 100644 tests/test_api_timestamps.py delete mode 100644 tests/test_timestamp_models.py create mode 100644 tool_showUnusedFunctions.py diff --git a/app.py b/app.py index bfe82f8f..469653bc 100644 --- a/app.py +++ b/app.py @@ -109,9 +109,9 @@ def initLogging(): ) # Silence noisy third-party libraries - use the same level as the root logger - noisyLoggers = ["httpx", "httpcore", "urllib3", "asyncio", "fastapi.security.oauth2"] + noisyLoggers = ["httpx", "httpcore", "urllib3", "asyncio", "fastapi.security.oauth2", "msal"] for loggerName in noisyLoggers: - logging.getLogger(loggerName).setLevel(logLevel) + logging.getLogger(loggerName).setLevel(logging.WARNING) # Log the current logging configuration logger = logging.getLogger(__name__) diff --git a/modules/chat/documents/documentExtraction.py b/modules/chat/documents/documentExtraction.py index fcebf79e..ea96289d 100644 --- a/modules/chat/documents/documentExtraction.py +++ b/modules/chat/documents/documentExtraction.py @@ -159,13 +159,13 @@ class DocumentExtraction: "svg": 40000 # SVG content } - def _robustTextDecode(self, fileData: bytes, filename: str = "unknown") -> str: + def _robustTextDecode(self, fileData: bytes, fileName: str = "unknown") -> str: """ Robustly decode text data with multiple encoding fallbacks. Args: fileData: Raw bytes to decode - filename: Filename for logging purposes + fileName: fileName for logging purposes Returns: Decoded text string @@ -207,16 +207,16 @@ class DocumentExtraction: else: # Last resort: decode with replacement characters content = fileData.decode('utf-8', errors='replace') - logger.warning(f"{filename}: decoded with UTF-8 and replacement characters due to low encoding confidence") + logger.warning(f"{fileName}: decoded with UTF-8 and replacement characters due to low encoding confidence") return content except ImportError: # chardet not available, use replacement characters content = fileData.decode('utf-8', errors='replace') - logger.warning(f"{filename}: decoded with UTF-8 and replacement characters (chardet not available)") + logger.warning(f"{fileName}: decoded with UTF-8 and replacement characters (chardet not available)") return content # This should never be reached, but just in case - raise FileProcessingError(f"Failed to decode {filename} with any encoding") + raise FileProcessingError(f"Failed to decode {fileName} with any encoding") def initialize(self) -> None: """Initialize the document processor.""" @@ -262,13 +262,13 @@ class DocumentExtraction: - async def processFileData(self, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, prompt: str = None, documentId: str = None, enableAI: bool = True) -> ExtractedContent: + async def processFileData(self, fileData: bytes, fileName: str, mimeType: str, base64Encoded: bool = False, prompt: str = None, documentId: str = None, enableAI: bool = True) -> ExtractedContent: """ Process file data directly and extract its contents with optional AI processing. Args: fileData: Raw file data as bytes - filename: Name of the file + fileName: Name of the file mimeType: MIME type of the file base64Encoded: Whether the data is base64 encoded prompt: Prompt for AI content extraction @@ -287,13 +287,13 @@ class DocumentExtraction: fileData = base64.b64decode(fileData) # Use documentUtility for mime type detection if mimeType == "application/octet-stream": - mimeType = detectMimeTypeFromData(fileData, filename, self._serviceCenter) + mimeType = detectMimeTypeFromData(fileData, fileName, self._serviceCenter) # Process document based on type if mimeType not in self.supportedTypes: - contentItems = await self._processBinary(fileData, filename, mimeType) + contentItems = await self._processBinary(fileData, fileName, mimeType) else: processor = self.supportedTypes[mimeType] - contentItems = await processor(fileData, filename, mimeType) + contentItems = await processor(fileData, fileName, mimeType) # Process with AI if prompt provided and AI is enabled if enableAI and prompt and contentItems: @@ -304,7 +304,7 @@ class DocumentExtraction: except Exception as e: logger.error(f"Error processing content with AI: {str(e)}") elif not enableAI: - logger.debug(f"AI processing disabled for {filename}, returning raw extracted content") + logger.debug(f"AI processing disabled for {fileName}, returning raw extracted content") return ExtractedContent( id=documentId if documentId else str(uuid.uuid4()), @@ -317,14 +317,14 @@ class DocumentExtraction: - async def _processText(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processText(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process text document with robust encoding detection and complete content extraction""" try: - content = self._robustTextDecode(fileData, filename) + content = self._robustTextDecode(fileData, fileName) # Validate that we got the complete content if not content or len(content.strip()) == 0: - logger.warning(f"Empty content extracted from {filename}") + logger.warning(f"Empty content extracted from {fileName}") return [ContentItem( label="empty", data="[Empty file or no readable content]", @@ -341,7 +341,7 @@ class DocumentExtraction: # Use documentUtility for mime type - mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) + mime_type = getMimeTypeFromExtension(getFileExtension(fileName), self._serviceCenter) return [ContentItem( label="main", data=content, @@ -356,11 +356,11 @@ class DocumentExtraction: logger.error(f"Error processing text document: {str(e)}") raise FileProcessingError(f"Failed to process text document: {str(e)}") - async def _processCsv(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processCsv(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process CSV document with robust encoding detection""" try: - content = self._robustTextDecode(fileData, filename) - mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) + content = self._robustTextDecode(fileData, fileName) + mime_type = getMimeTypeFromExtension(getFileExtension(fileName), self._serviceCenter) return [ContentItem( label="main", data=content, @@ -375,12 +375,12 @@ class DocumentExtraction: logger.error(f"Error processing CSV document: {str(e)}") raise FileProcessingError(f"Failed to process CSV document: {str(e)}") - async def _processJson(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processJson(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process JSON document with robust encoding detection""" try: - content = self._robustTextDecode(fileData, filename) + content = self._robustTextDecode(fileData, fileName) jsonData = json.loads(content) - mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) + mime_type = getMimeTypeFromExtension(getFileExtension(fileName), self._serviceCenter) return [ContentItem( label="main", data=content, @@ -395,11 +395,11 @@ class DocumentExtraction: logger.error(f"Error processing JSON document: {str(e)}") raise FileProcessingError(f"Failed to process JSON document: {str(e)}") - async def _processXml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processXml(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process XML document with robust encoding detection""" try: - content = self._robustTextDecode(fileData, filename) - mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) + content = self._robustTextDecode(fileData, fileName) + mime_type = getMimeTypeFromExtension(getFileExtension(fileName), self._serviceCenter) return [ContentItem( label="main", data=content, @@ -414,11 +414,11 @@ class DocumentExtraction: logger.error(f"Error processing XML document: {str(e)}") raise FileProcessingError(f"Failed to process XML document: {str(e)}") - async def _processHtml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processHtml(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process HTML document with robust encoding detection""" try: - content = self._robustTextDecode(fileData, filename) - mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) + content = self._robustTextDecode(fileData, fileName) + mime_type = getMimeTypeFromExtension(getFileExtension(fileName), self._serviceCenter) return [ContentItem( label="main", data=content, @@ -433,10 +433,10 @@ class DocumentExtraction: logger.error(f"Error processing HTML document: {str(e)}") raise FileProcessingError(f"Failed to process HTML document: {str(e)}") - async def _processSvg(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processSvg(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process SVG document with robust encoding detection and meaningful content extraction""" try: - content = self._robustTextDecode(fileData, filename) + content = self._robustTextDecode(fileData, fileName) # Check if it's actually SVG content if " List[ContentItem]: + async def _processImage(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process image document""" try: self._loadImageProcessor() @@ -592,7 +592,7 @@ class DocumentExtraction: logger.error(f"Error processing image document: {str(e)}") raise FileProcessingError(f"Failed to process image document: {str(e)}") - async def _processPdf(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processPdf(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process PDF document""" try: self._loadPdfExtractor() @@ -661,7 +661,7 @@ class DocumentExtraction: logger.error(f"Error processing PDF document: {str(e)}") raise FileProcessingError(f"Failed to process PDF document: {str(e)}") - async def _processDocx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processDocx(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process Word document with enhanced formatting preservation""" try: self._loadOfficeExtractor() @@ -853,7 +853,7 @@ class DocumentExtraction: logger.error(f"Error processing Word document: {str(e)}") raise FileProcessingError(f"Failed to process Word document: {str(e)}") - async def _processXlsx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processXlsx(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process Excel document with enhanced table extraction and metadata""" try: self._loadOfficeExtractor() @@ -867,7 +867,7 @@ class DocumentExtraction: workbook = openpyxl.load_workbook(xlsxStream, data_only=True) except Exception as load_error: - logger.error(f"Failed to load Excel workbook {filename}: {str(load_error)}") + logger.error(f"Failed to load Excel workbook {fileName}: {str(load_error)}") raise FileProcessingError(f"Failed to load Excel workbook: {str(load_error)}") # Extract workbook properties safely @@ -1122,7 +1122,7 @@ class DocumentExtraction: logger.error(f"Error processing Excel document: {str(e)}") raise FileProcessingError(f"Failed to process Excel document: {str(e)}") - async def _processLegacyDoc(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processLegacyDoc(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process legacy Word .doc document""" try: # Try to use antiword or similar tools for .doc files @@ -1130,7 +1130,7 @@ class DocumentExtraction: contentItems = [] # Create a basic content item explaining the limitation - info_content = f"""Legacy Word Document (.doc) - {filename} + info_content = f"""Legacy Word Document (.doc) - {fileName} Note: This is a legacy .doc format file. For better content extraction, consider converting to .docx format. @@ -1173,7 +1173,7 @@ The raw binary content is available but not human-readable.""" logger.error(f"Error processing legacy Word document: {str(e)}") raise FileProcessingError(f"Failed to process legacy Word document: {str(e)}") - async def _processLegacyXls(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processLegacyXls(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process legacy Excel .xls document""" try: # Try to use xlrd or similar tools for .xls files @@ -1181,7 +1181,7 @@ The raw binary content is available but not human-readable.""" contentItems = [] # Create a basic content item explaining the limitation - info_content = f"""Legacy Excel Document (.xls) - {filename} + info_content = f"""Legacy Excel Document (.xls) - {fileName} Note: This is a legacy .xls format file. For better content extraction, consider converting to .xlsx format. @@ -1224,7 +1224,7 @@ The raw binary content is available but not human-readable.""" logger.error(f"Error processing legacy Excel document: {str(e)}") raise FileProcessingError(f"Failed to process legacy Excel document: {str(e)}") - async def _processLegacyPpt(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processLegacyPpt(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process legacy PowerPoint .ppt document""" try: # Try to use python-pptx or similar tools for .ppt files @@ -1232,7 +1232,7 @@ The raw binary content is available but not human-readable.""" contentItems = [] # Create a basic content item explaining the limitation - info_content = f"""Legacy PowerPoint Document (.ppt) - {filename} + info_content = f"""Legacy PowerPoint Document (.ppt) - {fileName} Note: This is a legacy .ppt format file. For better content extraction, consider converting to .pptx format. @@ -1275,7 +1275,7 @@ The raw binary content is available but not human-readable.""" logger.error(f"Error processing legacy PowerPoint document: {str(e)}") raise FileProcessingError(f"Failed to process legacy PowerPoint document: {str(e)}") - async def _processPptx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processPptx(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process PowerPoint document""" try: self._loadOfficeExtractor() @@ -1351,7 +1351,7 @@ The raw binary content is available but not human-readable.""" logger.error(f"Error processing PowerPoint document: {str(e)}") raise FileProcessingError(f"Failed to process PowerPoint document: {str(e)}") - async def _processBinary(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: + async def _processBinary(self, fileData: bytes, fileName: str, mimeType: str) -> List[ContentItem]: """Process binary document""" try: return [ContentItem( diff --git a/modules/chat/documents/documentGeneration.py b/modules/chat/documents/documentGeneration.py index 27525da3..9907d0b0 100644 --- a/modules/chat/documents/documentGeneration.py +++ b/modules/chat/documents/documentGeneration.py @@ -23,7 +23,7 @@ class DocumentGenerator: """ try: # Read documents from the standard documents field (not data.documents) - documents = action_result.documents if hasattr(action_result, 'documents') else [] + documents = action_result.documents if action_result and hasattr(action_result, 'documents') else [] if not documents: logger.info(f"No documents found in action_result.documents for {action.execMethod}.{action.execAction}") @@ -56,152 +56,21 @@ class DocumentGenerator: return [] def processSingleDocument(self, doc: Any, action) -> Optional[Dict[str, Any]]: - """Process a single document from action result""" + """Process a single document from action result with simplified logic""" try: - if hasattr(doc, 'filename') and doc.filename: - # Document object with filename attribute - mime_type = getattr(doc, 'mimeType', 'application/octet-stream') - if mime_type == "application/octet-stream": - content = getattr(doc, 'content', '') - mime_type = detectMimeTypeFromContent(content, doc.filename, self.service) - - # Add result label to filename for document objects too - base_filename = doc.filename - if hasattr(action, 'execResultLabel') and action.execResultLabel: - result_label = action.execResultLabel.strip() - if result_label: - # Check if filename already starts with resultLabel to avoid duplication - if not base_filename.startswith(f"{result_label}-"): - base_filename = f"{result_label}-{base_filename}" - logger.info(f"Added resultLabel '{result_label}' as prefix to document object filename: {base_filename}") - else: - logger.info(f"Document object filename already has resultLabel prefix: {base_filename}") - - return { - 'filename': base_filename, - 'fileSize': getattr(doc, 'fileSize', 0), - 'mimeType': mime_type, - 'content': getattr(doc, 'content', ''), - 'document': doc - } - elif hasattr(doc, 'documentName') and doc.documentName: - # ActionDocument object with documentName attribute - base_filename = doc.documentName - mime_type = getattr(doc, 'mimeType', 'application/octet-stream') - content = getattr(doc, 'documentData', '') - - # Add result label to filename for ActionDocument objects - if hasattr(action, 'execResultLabel') and action.execResultLabel: - result_label = action.execResultLabel.strip() - if result_label: - # Check if filename already starts with resultLabel to avoid duplication - if not base_filename.startswith(f"{result_label}-"): - base_filename = f"{result_label}-{base_filename}" - logger.info(f"Added resultLabel '{result_label}' as prefix to ActionDocument filename: {base_filename}") - else: - logger.info(f"ActionDocument filename already has resultLabel prefix: {base_filename}") - - # Calculate file size from actual content - fileSize = len(str(content)) if content else 0 - - logger.info(f"Processed ActionDocument: {base_filename}, content length: {len(str(content))}, mimeType: {mime_type}") - - return { - 'filename': base_filename, - 'fileSize': fileSize, - 'mimeType': mime_type, - 'content': content, - 'document': doc - } - elif isinstance(doc, dict): - # Dictionary format document - handle both 'documentName' and 'filename' keys - base_filename = doc.get('documentName', doc.get('filename', '')) - - # Debug logging for resultLabel - if hasattr(action, 'execResultLabel'): - logger.info(f"Action {action.execMethod}.{action.execAction} has execResultLabel: '{action.execResultLabel}' (type: {type(action.execResultLabel)})") - else: - logger.info(f"Action {action.execMethod}.{action.execAction} has NO execResultLabel attribute") - - # If no filename provided, generate one with action info - if not base_filename: - timestamp = int(get_utc_timestamp()) - base_filename = f"{action.execMethod}_{action.execAction}_{timestamp}" - - # ALWAYS add result label to filename for better document selection - # This ensures consistent naming regardless of whether filename was provided or generated - if hasattr(action, 'execResultLabel') and action.execResultLabel: - result_label = action.execResultLabel.strip() - if result_label: - # Check if filename already starts with resultLabel to avoid duplication - if not base_filename.startswith(f"{result_label}-"): - base_filename = f"{result_label}-{base_filename}" - logger.info(f"Added resultLabel '{result_label}' as prefix to filename: {base_filename}") - else: - logger.info(f"Filename already has resultLabel prefix: {base_filename}") - else: - logger.info(f"No resultLabel available for action {action.execMethod}.{action.execAction}") - - filename = base_filename - mimeType = doc.get('mimeType', 'application/octet-stream') - - # Handle documentData structure - it might be a dict with 'content' key or direct content - document_data = doc.get('documentData', '') - if isinstance(document_data, dict) and 'content' in document_data: - # This is the structure returned by extract action: documentData.content - content = document_data['content'] - # Also check for other potential content fields - if not content and 'data' in document_data: - content = document_data['data'] - else: - # Direct content (fallback) - content = document_data - - # Calculate file size from actual content - fileSize = len(str(content)) if content else 0 - - # Detect mime type if not specified - if mimeType == "application/octet-stream": - mimeType = detectMimeTypeFromContent(content, filename, self.service) - - logger.info(f"Processed document: {filename}, content length: {len(str(content))}, mimeType: {mimeType}") - - return { - 'filename': filename, - 'fileSize': fileSize, - 'mimeType': mimeType, - 'content': content, - 'document': doc - } - else: - # Unknown document type - logger.warning(f"Unknown document type for action {action.execMethod}.{action.execAction}: {type(doc)}") - timestamp = int(get_utc_timestamp()) - base_filename = f"{action.execMethod}_{action.execAction}_{timestamp}" - - # ALWAYS add result label to filename for better document selection - # This ensures consistent naming regardless of document type - if hasattr(action, 'execResultLabel') and action.execResultLabel: - result_label = action.execResultLabel.strip() - if result_label: - # Check if filename already starts with resultLabel to avoid duplication - if not base_filename.startswith(f"{result_label}-"): - base_filename = f"{result_label}-{base_filename}" - logger.info(f"Added resultLabel '{result_label}' as prefix to fallback filename: {base_filename}") - else: - logger.info(f"Fallback filename already has resultLabel prefix: {base_filename}") - else: - logger.info(f"No resultLabel available for action {action.execMethod}.{action.execAction}") - - filename = base_filename - mimeType = detectMimeTypeFromContent(doc, filename, self.service) - return { - 'filename': filename, - 'fileSize': 0, - 'mimeType': mimeType, - 'content': str(doc), - 'document': doc - } + # ActionDocument objects have documentName, documentData, and mimeType + mime_type = doc.mimeType + if mime_type == "application/octet-stream": + content = doc.documentData + mime_type = detectMimeTypeFromContent(content, doc.documentName, self.service) + + return { + 'fileName': doc.documentName, + 'fileSize': len(str(doc.documentData)), + 'mimeType': mime_type, + 'content': doc.documentData, + 'document': doc + } except Exception as e: logger.error(f"Error processing single document: {str(e)}") return None @@ -209,7 +78,7 @@ class DocumentGenerator: def createDocumentsFromActionResult(self, action_result, action, workflow) -> List[Any]: """ Create actual document objects from action result and store them in the system. - Returns a list of created document objects. + Returns a list of created document objects with proper workflow context. """ try: logger.info(f"Creating documents from action result for {action.execMethod}.{action.execAction}") @@ -221,7 +90,7 @@ class DocumentGenerator: created_documents = [] for i, doc_data in enumerate(processed_docs): try: - document_name = doc_data['filename'] + document_name = doc_data['fileName'] document_data = doc_data['content'] mime_type = doc_data['mimeType'] @@ -260,12 +129,14 @@ class DocumentGenerator: existing_file_id=file_id ) if document: + # Set workflow context on the document if possible + self._setDocumentWorkflowContext(document, action, workflow) created_documents.append(document) - logger.info(f"Successfully created ChatDocument: {document_name} (ID: {getattr(document, 'id', 'N/A')}, fileId: {getattr(document, 'fileId', 'N/A')})") + logger.info(f"Successfully created ChatDocument: {document_name} (ID: {document.id if hasattr(document, 'id') else 'N/A'}, fileId: {document.fileId if hasattr(document, 'fileId') else 'N/A'})") else: logger.error(f"Failed to create ChatDocument object for {document_name}") except Exception as e: - logger.error(f"Error creating document {doc_data.get('filename', 'unknown')}: {str(e)}") + logger.error(f"Error creating document {doc_data.get('fileName', 'unknown')}: {str(e)}") continue logger.info(f"Successfully created {len(created_documents)} documents") @@ -273,3 +144,36 @@ class DocumentGenerator: except Exception as e: logger.error(f"Error creating documents from action result: {str(e)}") return [] + + def _setDocumentWorkflowContext(self, document, action, workflow): + """Set workflow context on a document for proper routing and labeling""" + try: + # Get current workflow context from service center + workflow_context = self.service.getWorkflowContext() + workflow_stats = self.service.getWorkflowStats() + + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + current_action = workflow_context.get('currentAction', 0) + + # Try to set workflow context attributes if they exist + if hasattr(document, 'roundNumber'): + document.roundNumber = current_round + if hasattr(document, 'taskNumber'): + document.taskNumber = current_task + if hasattr(document, 'actionNumber'): + document.actionNumber = current_action + if hasattr(document, 'actionId'): + document.actionId = action.id if hasattr(action, 'id') else None + + # Set additional workflow metadata if available + if hasattr(document, 'workflowId'): + document.workflowId = workflow_stats.get('workflowId', workflow.id if hasattr(workflow, 'id') else None) + if hasattr(document, 'workflowStatus'): + document.workflowStatus = workflow_stats.get('workflowStatus', workflow.status if hasattr(workflow, 'status') else 'unknown') + + logger.debug(f"Set workflow context on document: Round {current_round}, Task {current_task}, Action {current_action}") + logger.debug(f"Document workflow metadata: ID={document.workflowId if hasattr(document, 'workflowId') else 'N/A'}, Status={document.workflowStatus if hasattr(document, 'workflowStatus') else 'N/A'}") + + except Exception as e: + logger.warning(f"Could not set workflow context on document: {str(e)}") diff --git a/modules/chat/documents/documentUtility.py b/modules/chat/documents/documentUtility.py index 1b74b475..3d674720 100644 --- a/modules/chat/documents/documentUtility.py +++ b/modules/chat/documents/documentUtility.py @@ -4,10 +4,10 @@ from typing import Any, Dict logger = logging.getLogger(__name__) -def getFileExtension(filename: str) -> str: - """Extract file extension from filename""" - if '.' in filename: - return filename.rsplit('.', 1)[-1].lower() +def getFileExtension(fileName: str) -> str: + """Extract file extension from fileName""" + if '.' in fileName: + return fileName.rsplit('.', 1)[-1].lower() return '' def getMimeTypeFromExtension(extension: str, service=None) -> str: @@ -36,22 +36,22 @@ def getMimeTypeFromExtension(extension: str, service=None) -> str: } return mapping.get(extension.lower(), 'application/octet-stream') -def detectMimeTypeFromData(file_bytes: bytes, filename: str, service=None) -> str: - """Detect MIME type from file bytes and filename using a service if provided.""" +def detectMimeTypeFromData(file_bytes: bytes, fileName: str, service=None) -> str: + """Detect MIME type from file bytes and fileName using a service if provided.""" try: if service: - detected = service.detectContentTypeFromData(file_bytes, filename) + detected = service.detectContentTypeFromData(file_bytes, fileName) if detected and detected != 'application/octet-stream': return detected # Fallback: guess from extension - ext = getFileExtension(filename) + ext = getFileExtension(fileName) return getMimeTypeFromExtension(ext, service) except Exception as e: - logger.warning(f"Error in MIME type detection for {filename}: {str(e)}") + logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}") return 'application/octet-stream' -def detectMimeTypeFromContent(content: Any, filename: str, service=None) -> str: - """Detect MIME type from content and filename using a service if provided.""" +def detectMimeTypeFromContent(content: Any, fileName: str, service=None) -> str: + """Detect MIME type from content and fileName using a service if provided.""" try: if isinstance(content, str): file_bytes = content.encode('utf-8') @@ -59,9 +59,9 @@ def detectMimeTypeFromContent(content: Any, filename: str, service=None) -> str: file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8') else: file_bytes = str(content).encode('utf-8') - return detectMimeTypeFromData(file_bytes, filename, service) + return detectMimeTypeFromData(file_bytes, fileName, service) except Exception as e: - logger.warning(f"Error in MIME type detection for {filename}: {str(e)}") + logger.warning(f"Error in MIME type detection for {fileName}: {str(e)}") return 'application/octet-stream' def convertDocumentDataToString(document_data: Any, file_extension: str) -> str: diff --git a/modules/chat/handling/executionState.py b/modules/chat/handling/executionState.py index ce0ea95a..1f806745 100644 --- a/modules/chat/handling/executionState.py +++ b/modules/chat/handling/executionState.py @@ -29,29 +29,6 @@ class TaskExecutionState: self.failed_actions.append(action_result) self.current_action_index += 1 - def getAvailableResults(self) -> list: - """Get available results from successful actions""" - results = [] - for action in self.successful_actions: - if action.documents: - # Extract text content from documents - for doc in action.documents: - if hasattr(doc, 'documentData'): - if isinstance(doc.documentData, dict): - result_text = doc.documentData.get("result", "") - elif isinstance(doc.documentData, str): - result_text = doc.documentData - else: - result_text = str(doc.documentData) - - if result_text and result_text.strip(): - results.append(result_text) - return results - - def shouldRetryTask(self) -> bool: - """Determine if task should be retried based on failure patterns""" - return len(self.failed_actions) > 0 and self.canRetry() - def canRetry(self) -> bool: """Check if task can be retried""" return self.retry_count < self.max_retries diff --git a/modules/chat/handling/handlingTasks.py b/modules/chat/handling/handlingTasks.py index 8d783264..8a0bc21f 100644 --- a/modules/chat/handling/handlingTasks.py +++ b/modules/chat/handling/handlingTasks.py @@ -12,8 +12,13 @@ from modules.interfaces.interfaceChatModel import ( ) from modules.shared.timezoneUtils import get_utc_timestamp from .executionState import TaskExecutionState -from .promptFactory import createTaskPlanningPrompt, createActionDefinitionPrompt, createResultReviewPrompt +from .promptFactory import ( + createTaskPlanningPrompt, + createActionDefinitionPrompt, + createResultReviewPrompt +) from modules.chat.documents.documentGeneration import DocumentGenerator +import uuid logger = logging.getLogger(__name__) @@ -58,16 +63,43 @@ class HandlingTasks: logger.info(f"Generating task plan for workflow {workflow.id}") available_docs = self.service.getAvailableDocuments(workflow) + # Set initial workflow context + self.service.setWorkflowContext(round_number=1, task_number=0, action_number=0) # Check workflow status before calling AI service self._checkWorkflowStopped() + # Create proper context object for task planning + # For task planning, we need to create a minimal TaskStep since TaskContext requires it + from modules.interfaces.interfaceChatModel import TaskStep + planning_task_step = TaskStep( + id="planning", + objective=userInput, + dependencies=[], + success_criteria=[], + estimated_complexity="medium" + ) + + task_planning_context = TaskContext( + task_step=planning_task_step, + workflow=workflow, + workflow_id=workflow.id, + available_documents=available_docs, + available_connections=[], + previous_results=[], + previous_handover=None, + improvements=[], + retry_count=0, + previous_action_results=[], + previous_review_result=None, + is_regeneration=False, + failure_patterns=[], + failed_actions=[], + successful_actions=[] + ) + prompt = await self.service.callAiTextAdvanced( - createTaskPlanningPrompt({ - 'user_request': userInput, - 'available_documents': available_docs, - 'workflow_id': workflow.id - }) + createTaskPlanningPrompt(task_planning_context, self.service) ) # Inline _parseTaskPlanResponse logic try: @@ -100,12 +132,16 @@ class HandlingTasks: tasks=tasks ) + # Set workflow totals for progress tracking + total_tasks = len(tasks) + self.service.setWorkflowTotals(total_tasks=total_tasks) + logger.info(f"Task plan generated successfully with {len(tasks)} tasks") # Log the generated tasks for i, task in enumerate(tasks): logger.info(f" Task {i+1}: {task.objective}") - if hasattr(task, 'success_criteria') and task.success_criteria: + if task.success_criteria: logger.info(f" Success criteria: {task.success_criteria}") # Log the complete task plan @@ -118,11 +154,220 @@ class HandlingTasks: logger.info(f"AI Response with task plan: {prompt}") logger.info("=== END RAW AI TASK PLAN JSON ===") + # PHASE 3: Create chat message containing the task plan + await self.createTaskPlanMessage(task_plan, workflow) + return task_plan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") raise + async def createTaskPlanMessage(self, task_plan: TaskPlan, workflow): + """Create a chat message containing the task plan with user-friendly messages""" + try: + # Build task plan summary + task_summary = f"📋 **Task Plan Generated**\n\n" + task_summary += f"**Overview:** {task_plan.overview}\n\n" + task_summary += f"**Total Tasks:** {len(task_plan.tasks)}\n\n" + + # Add each task with its user message + for i, task in enumerate(task_plan.tasks): + task_summary += f"**Task {i+1}:** {task.objective}\n" + if task.userMessage: + task_summary += f" 💬 {task.userMessage}\n" + if task.success_criteria: + criteria_str = ', '.join(task.success_criteria) + task_summary += f" ✅ Success Criteria: {criteria_str}\n" + task_summary += "\n" + + # Get overall user message from task plan if available + overall_message = task_plan.userMessage + if overall_message: + task_summary += f"**Plan Summary:** {overall_message}\n\n" + + # Create workflow message + message_data = { + "workflowId": workflow.id, + "role": "assistant", + "message": task_summary, + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": "task_plan", + "documents": [], + # Add workflow context fields + "roundNumber": 1, # Task plan is always round 1 + "taskNumber": 0, # Task plan is before individual tasks + "actionNumber": 0 + } + + message = self.chatInterface.createWorkflowMessage(message_data) + if message: + workflow.messages.append(message) + logger.info(f"Task plan message created with {len(task_plan.tasks)} tasks") + else: + logger.error("Failed to create task plan message") + + except Exception as e: + logger.error(f"Error creating task plan message: {str(e)}") + + async def createDocumentContextMessage(self, documents: List, workflow): + """Create a chat message with enhanced document context and workflow labeling""" + try: + from .promptFactory import createDocumentContextPrompt + + # Get user language from service + user_language = self.service.user.language if self.service and self.service.user else 'en' + + # Get current workflow context and stats + workflow_context = self.service.getWorkflowContext() + workflow_stats = self.service.getWorkflowStats() + + # Build context for the document context prompt + context = { + 'documents': documents, + 'workflow_context': { + 'currentRound': workflow_context.get('currentRound', 1), + 'totalTasks': workflow_stats.get('totalTasks', 0), + 'currentTask': workflow_context.get('currentTask', 0), + 'totalActions': workflow_stats.get('totalActions', 0), + 'currentAction': workflow_context.get('currentAction', 0), + 'workflowStatus': workflow_stats.get('workflowStatus', 'unknown'), + 'workflowId': workflow_stats.get('workflowId', 'unknown') + }, + 'user_language': user_language + } + + # Generate enhanced document context using AI + prompt = createDocumentContextPrompt(context) + response = await self.service.callAiTextAdvanced(prompt) + + # Parse the AI response + try: + json_start = response.find('{') + json_end = response.find('}') + 1 + if json_start != -1 and json_end > 0: + json_str = response[json_start:json_end] + doc_context = json.loads(json_str) + + # Build message from AI response + message_text = f"📄 **Document Context**\n\n" + message_text += f"**Summary:** {doc_context.get('documentSummary', 'No summary available')}\n\n" + message_text += f"**Workflow Progress:** {doc_context.get('workflowProgress', 'No progress info')}\n\n" + + # Add workflow context information + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + total_tasks = workflow_stats.get('totalTasks', 0) + current_action = workflow_context.get('currentAction', 0) + total_actions = workflow_stats.get('totalActions', 0) + + message_text += f"**Workflow Context:**\n" + message_text += f"- Round: {current_round}\n" + if total_tasks > 0: + message_text += f"- Task: {current_task}/{total_tasks}\n" + else: + message_text += f"- Task: {current_task}\n" + if total_actions > 0: + message_text += f"- Action: {current_action}/{total_actions}\n" + else: + message_text += f"- Action: {current_action}\n" + message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}\n\n" + + # Add overall user message if available + overall_message = doc_context.get('overallUserMessage') + if overall_message: + message_text += f"💬 {overall_message}\n\n" + + # Add document details + document_details = doc_context.get('documentDetails', []) + if document_details: + message_text += "**Document Details:**\n" + for doc_detail in document_details: + message_text += f"- {doc_detail.get('workflowLabel', 'Unknown')}: {doc_detail.get('fileName', 'Unknown file')}\n" + user_msg = doc_detail.get('userMessage') + if user_msg: + message_text += f" 💬 {user_msg}\n" + message_text += "\n" + else: + # Fallback if AI response parsing fails + message_text = f"📄 **Document Context**\n\n" + message_text += f"**Total Documents:** {len(documents)}\n\n" + + # Add workflow context information even in fallback + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + total_tasks = workflow_stats.get('totalTasks', 0) + current_action = workflow_context.get('currentAction', 0) + total_actions = workflow_stats.get('totalActions', 0) + + message_text += f"**Workflow Context:**\n" + message_text += f"- Round: {current_round}\n" + if total_tasks > 0: + message_text += f"- Task: {current_task}/{total_tasks}\n" + else: + message_text += f"- Task: {current_task}\n" + if total_actions > 0: + message_text += f"- Action: {current_action}/{total_actions}\n" + else: + message_text += f"- Action: {current_action}\n" + message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}\n\n" + + message_text += "Document context information is available for processing." + + except Exception as e: + logger.error(f"Error parsing document context AI response: {str(e)}") + # Fallback message with workflow context + message_text = f"📄 **Document Context**\n\n" + message_text += f"**Total Documents:** {len(documents)}\n\n" + + # Add workflow context information in fallback + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + total_tasks = workflow_stats.get('totalTasks', 0) + current_action = workflow_context.get('currentAction', 0) + total_actions = workflow_stats.get('totalActions', 0) + + message_text += f"**Workflow Context:**\n" + message_text += f"- Round: {current_round}\n" + if total_tasks > 0: + message_text += f"- Task: {current_task}/{total_tasks}\n" + else: + message_text += f"- Task: {current_task}\n" + if total_actions > 0: + message_text += f"- Action: {current_action}/{total_actions}\n" + else: + message_text += f"- Action: {current_action}\n" + message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}\n\n" + + message_text += "Document context information is available for processing." + + # Create workflow message + message_data = { + "workflowId": workflow.id, + "role": "assistant", + "message": message_text, + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": get_utc_timestamp(), + "documentsLabel": "document_context", + "documents": documents, + # Add workflow context fields + "roundNumber": workflow_context.get('currentRound', 1), + "taskNumber": workflow_context.get('currentTask', 0), + "actionNumber": workflow_context.get('currentAction', 0) + } + + message = self.chatInterface.createWorkflowMessage(message_data) + if message: + workflow.messages.append(message) + logger.info(f"Document context message created with {len(documents)} documents") + else: + logger.error("Failed to create document context message") + + except Exception as e: + logger.error(f"Error creating document context message: {str(e)}") + async def generateTaskActions(self, task_step, workflow, previous_results=None, enhanced_context=None) -> List[TaskAction]: """Generate actions for a given task step.""" try: @@ -134,27 +379,51 @@ class HandlingTasks: available_docs = self.service.getAvailableDocuments(workflow) available_connections = self.service.getConnectionReferenceList() + # Create proper context object for action definition + if enhanced_context and isinstance(enhanced_context, TaskContext): + # Use existing TaskContext if provided + action_context = TaskContext( + task_step=enhanced_context.task_step, + workflow=enhanced_context.workflow, + workflow_id=enhanced_context.workflow_id, + available_documents=enhanced_context.available_documents or available_docs, + available_connections=enhanced_context.available_connections or available_connections, + previous_results=enhanced_context.previous_results or previous_results or [], + previous_handover=enhanced_context.previous_handover, + improvements=enhanced_context.improvements or [], + retry_count=enhanced_context.retry_count or 0, + previous_action_results=enhanced_context.previous_action_results or [], + previous_review_result=enhanced_context.previous_review_result, + is_regeneration=enhanced_context.is_regeneration or False, + failure_patterns=enhanced_context.failure_patterns or [], + failed_actions=enhanced_context.failed_actions or [], + successful_actions=enhanced_context.successful_actions or [] + ) + else: + # Create new context from scratch + action_context = TaskContext( + task_step=task_step, + workflow=workflow, + workflow_id=workflow.id, + available_documents=available_docs, + available_connections=available_connections, + previous_results=previous_results or [], + previous_handover=None, + improvements=[], + retry_count=0, + previous_action_results=[], + previous_review_result=None, + is_regeneration=False, + failure_patterns=[], + failed_actions=[], + successful_actions=[] + ) - context = enhanced_context or TaskContext( - task_step=task_step, - workflow=workflow, - workflow_id=workflow.id, - available_documents=available_docs, - previous_results=previous_results or [], - improvements=[], - retry_count=0, - previous_action_results=[], - previous_review_result=None, - is_regeneration=False, - failure_patterns=[], - failed_actions=[], - successful_actions=[] - ) # Check workflow status before calling AI service self._checkWorkflowStopped() prompt = await self.service.callAiTextAdvanced( - await createActionDefinitionPrompt(context, self.service) + await createActionDefinitionPrompt(action_context, self.service) ) # Inline parseActionResponse logic here json_start = prompt.find('{') @@ -170,18 +439,20 @@ class HandlingTasks: if 'actions' not in action_data: raise ValueError("Action response missing 'actions' field") actions = action_data['actions'] - if not self._validateActions(actions, context): + if not self._validateActions(actions, action_context): logger.error("Generated actions failed validation") raise Exception("AI-generated actions failed validation - AI is required for action generation") # Convert to TaskAction objects - task_actions = [self.chatInterface.createTaskAction({ + task_actions = [self.createTaskAction({ "execMethod": a.get('method', 'unknown'), "execAction": a.get('action', 'unknown'), "execParameters": a.get('parameters', {}), "execResultLabel": a.get('resultLabel', ''), "expectedDocumentFormats": a.get('expectedDocumentFormats', None), - "status": TaskStatus.PENDING + "status": TaskStatus.PENDING, + # Extract user-friendly message if available + "userMessage": a.get('userMessage', None) }) for a in actions] valid_actions = [ta for ta in task_actions if ta] @@ -214,6 +485,11 @@ class HandlingTasks: """Execute all actions for a task step, with state management and retries.""" logger.info(f"=== STARTING TASK {task_index or '?'}: {task_step.objective} ===") + # Update workflow context for this task + if task_index is not None: + self.service.setWorkflowContext(task_number=task_index) + self.service.incrementWorkflowContext('task') + # Create database log entry for task start in format expected by frontend if task_index is not None: if total_tasks is not None: @@ -239,9 +515,17 @@ class HandlingTasks: "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": f"task_{task_index}_start", - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": 1, # Task start is always round 1 + "taskNumber": task_index, + "actionNumber": 0 } + # Add user-friendly message if available + if task_step.userMessage: + task_start_message["message"] += f"\n\n💬 {task_step.userMessage}" + message = self.chatInterface.createWorkflowMessage(task_start_message) if message: workflow.messages.append(message) @@ -256,6 +540,10 @@ class HandlingTasks: # Check workflow status before starting task execution self._checkWorkflowStopped() + # Update retry context with current attempt information + if retry_context: + retry_context.retry_count = attempt + 1 + actions = await self.generateTaskActions(task_step, workflow, previous_results=retry_context.previous_results, enhanced_context=retry_context) if not actions: logger.error("No actions defined for task step, aborting task execution") @@ -265,13 +553,25 @@ class HandlingTasks: total_actions = len(actions) logger.info(f"Task {task_index or '?'} has {total_actions} actions") + # Set workflow action total for this task + self.service.setWorkflowTotals(total_actions=total_actions) + + # Create document context message if documents are available + available_docs = self.service.getAvailableDocuments(workflow) + if available_docs: + await self.createDocumentContextMessage(available_docs, workflow) + action_results = [] for action_idx, action in enumerate(actions): # Check workflow status before each action execution self._checkWorkflowStopped() - # Log action start in format expected by frontend + # Update workflow context for this action action_number = action_idx + 1 + self.service.setWorkflowContext(action_number=action_number) + self.service.incrementWorkflowContext('action') + + # Log action start in format expected by frontend logger.info(f"Task {task_index} - Starting action {action_number}/{total_actions}") # Create database log entry for action start @@ -293,6 +593,17 @@ class HandlingTasks: "documents": [] } + # Add user-friendly message if available + if action.userMessage: + action_start_message["message"] += f"\n\n💬 {action.userMessage}" + + # Add workflow context fields + action_start_message.update({ + "roundNumber": 1, # Action start is always round 1 + "taskNumber": task_index, + "actionNumber": action_number + }) + message = self.chatInterface.createWorkflowMessage(action_start_message) if message: workflow.messages.append(message) @@ -340,9 +651,17 @@ class HandlingTasks: "sequenceNr": len(workflow.messages) + 1, "publishedAt": get_utc_timestamp(), "documentsLabel": f"task_{task_index}_completion", - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": 1, # Task completion is always round 1 + "taskNumber": task_index, + "actionNumber": 0 } + # Add user-friendly message if available + if task_step.userMessage: + task_completion_message["message"] += f"\n\n💬 {task_step.userMessage}" + message = self.chatInterface.createWorkflowMessage(task_completion_message) if message: workflow.messages.append(message) @@ -358,14 +677,18 @@ class HandlingTasks: elif review_result.status == 'retry' and state.canRetry(): logger.warning(f"Task step '{task_step.objective}' requires retry: {review_result.improvements}") state.incrementRetryCount() - retry_context.retry_count = state.retry_count - retry_context.improvements = review_result.improvements - retry_context.previous_action_results = action_results - retry_context.previous_review_result = review_result - retry_context.is_regeneration = True - retry_context.failure_patterns = state.getFailurePatterns() - retry_context.failed_actions = state.failed_actions - retry_context.successful_actions = state.successful_actions + + # Update retry context with retry information + if retry_context: + retry_context.retry_count = state.retry_count + retry_context.improvements = review_result.improvements + retry_context.previous_action_results = action_results + retry_context.previous_review_result = review_result + retry_context.is_regeneration = True + retry_context.failure_patterns = state.getFailurePatterns() + retry_context.failed_actions = state.failed_actions + retry_context.successful_actions = state.successful_actions + continue else: logger.error(f"=== TASK {task_index or '?'} FAILED: {task_step.objective} after {attempt+1} attempts ===") @@ -395,7 +718,11 @@ class HandlingTasks: "actionMethod": "task", "actionName": "task_retry", "documentsLabel": None, - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": 1, # Task retry is always round 1 + "taskNumber": task_index, + "actionNumber": 0 } try: @@ -442,7 +769,11 @@ class HandlingTasks: "actionMethod": "task", "actionName": "task_failure", "documentsLabel": None, - "documents": [] + "documents": [], + # Add workflow context fields + "roundNumber": 1, # Task failure is always round 1 + "taskNumber": task_index, + "actionNumber": 0 } try: @@ -468,10 +799,11 @@ class HandlingTasks: # Check workflow status before reviewing task completion self._checkWorkflowStopped() + # Create proper context object for result review review_context = ReviewContext( task_step=task_step, + task_actions=task_actions, action_results=action_results, - workflow=workflow, step_result={ 'successful_actions': sum(1 for result in action_results if result.success), 'total_actions': len(action_results), @@ -480,18 +812,21 @@ class HandlingTasks: 'documents': [ { 'action_index': i, - 'documents_count': len(result.documents) if hasattr(result, 'documents') and result.documents else 0, - 'documents': result.documents if hasattr(result, 'documents') and result.documents else [] + 'documents_count': len(result.documents) if result.documents else 0, + 'documents': result.documents if result.documents else [] } for i, result in enumerate(action_results) ] - } + }, + workflow_id=workflow.id, + previous_results=[] ) + # Check workflow status before calling AI service self._checkWorkflowStopped() # Use promptFactory for review prompt - prompt = await createResultReviewPrompt(review_context) + prompt = createResultReviewPrompt(review_context, self.service) response = await self.service.callAiTextAdvanced(prompt) # Inline parseReviewResponse logic here json_start = response.find('{') @@ -535,7 +870,9 @@ class HandlingTasks: missing_outputs=[], met_criteria=met_criteria, unmet_criteria=unmet_criteria, - confidence=review.get('confidence', 0.5) + confidence=review.get('confidence', 0.5), + # Extract user-friendly message if available + userMessage=review.get('userMessage', None) ) # Enhanced validation logging @@ -566,20 +903,14 @@ class HandlingTasks: self._checkWorkflowStopped() # Log handover status summary - if hasattr(review_result, 'status'): - status = review_result.status - if hasattr(review_result, 'met_criteria'): - met = review_result.met_criteria - else: - met = [] - - + status = review_result.status if review_result else 'unknown' + met = review_result.met_criteria if review_result and review_result.met_criteria else [] handover_data = { 'task_id': task_step.id, 'task_description': task_step.objective, 'actions': [action.to_dict() for action in task_actions], - 'review_result': review_result.to_dict() if hasattr(review_result, 'to_dict') else review_result, + 'review_result': review_result.to_dict(), 'workflow_id': workflow.id, 'handover_time': get_utc_timestamp() } @@ -589,6 +920,53 @@ class HandlingTasks: logger.error(f"Error in prepareTaskHandover: {str(e)}") return {'error': str(e)} + def createTaskAction(self, actionData: Dict[str, Any]) -> 'TaskAction': + """Creates a new task action.""" + try: + # Ensure ID is present + if "id" not in actionData or not actionData["id"]: + actionData["id"] = f"action_{uuid.uuid4()}" + + # Ensure required fields + if "status" not in actionData: + actionData["status"] = TaskStatus.PENDING + + if "execMethod" not in actionData: + logger.error("execMethod is required for task action") + return None + + if "execAction" not in actionData: + logger.error("execAction is required for task action") + return None + + if "execParameters" not in actionData: + actionData["execParameters"] = {} + + # Create action in database + createdAction = self.chatInterface.db.recordCreate("taskActions", actionData) + + # Convert to TaskAction model + return TaskAction( + id=createdAction["id"], + execMethod=createdAction["execMethod"], + execAction=createdAction["execAction"], + execParameters=createdAction.get("execParameters", {}), + execResultLabel=createdAction.get("execResultLabel"), + expectedDocumentFormats=createdAction.get("expectedDocumentFormats"), + status=createdAction.get("status", TaskStatus.PENDING), + error=createdAction.get("error"), + retryCount=createdAction.get("retryCount", 0), + retryMax=createdAction.get("retryMax", 3), + processingTime=createdAction.get("processingTime"), + timestamp=float(createdAction.get("timestamp", get_utc_timestamp())), + result=createdAction.get("result"), + resultDocuments=createdAction.get("resultDocuments", []) + ) + + except Exception as e: + logger.error(f"Error creating task action: {str(e)}") + return None + # --- Helper action handling methods --- async def executeSingleAction(self, action, workflow, task_step, task_index=None, action_index=None, total_actions=None): @@ -638,9 +1016,9 @@ class HandlingTasks: if result.documents and len(result.documents) > 0: # Try to get text content from the first document first_doc = result.documents[0] - if hasattr(first_doc, 'documentData') and isinstance(first_doc.documentData, dict): + if isinstance(first_doc.documentData, dict): action.result = first_doc.documentData.get("result", "") - elif hasattr(first_doc, 'documentData') and isinstance(first_doc.documentData, str): + elif isinstance(first_doc.documentData, str): action.result = first_doc.documentData # Preserve the action's execResultLabel for document routing # Action methods should NOT return resultLabel - this is managed by the action handler @@ -670,24 +1048,14 @@ class HandlingTasks: if created_documents: logger.info(f"Output documents ({len(created_documents)}):") for i, doc in enumerate(created_documents): - if hasattr(doc, 'filename'): - logger.info(f" {i+1}. {doc.filename}") - elif isinstance(doc, dict) and 'filename' in doc: - logger.info(f" {i+1}. {doc['filename']}") - else: - logger.info(f" {i+1}. {type(doc).__name__}") + logger.info(f" {i+1}. {doc.fileName}") # Log document details for debugging logger.info("Document details:") for i, doc in enumerate(created_documents): - if hasattr(doc, 'filename'): - logger.info(f" Doc {i+1}: filename={doc.filename}, type={type(doc)}") - if hasattr(doc, 'id'): - logger.info(f" ID: {doc.id}") - if hasattr(doc, 'fileId'): - logger.info(f" File ID: {doc.fileId}") - elif isinstance(doc, dict): - logger.info(f" Doc {i+1}: dict with keys: {list(doc.keys())}") + logger.info(f" Doc {i+1}: fileName={doc.fileName}, type={type(doc)}") + logger.info(f" ID: {doc.id}") + logger.info(f" File ID: {doc.fileId}") else: logger.info("Output: No documents created") else: @@ -716,7 +1084,7 @@ class HandlingTasks: # Preserve the original documents field from the method result # This ensures the standard document format is maintained - original_documents = result.documents if hasattr(result, 'documents') else [] + original_documents = result.documents # Extract result text from documents if available result_text = self._extractResultText(result) @@ -756,23 +1124,95 @@ class HandlingTasks: else: logger.info(f"Result label: {result_label} - No documents") + # Get current workflow context and stats + workflow_context = self.service.getWorkflowContext() + workflow_stats = self.service.getWorkflowStats() + # Create a more meaningful message that includes task context task_objective = task_step.objective if task_step else 'Unknown task' # Build a user-friendly message based on success/failure if result.success: if created_documents and len(created_documents) > 0: - doc_names = [doc.filename if hasattr(doc, 'filename') else str(doc) for doc in created_documents[:3]] + doc_names = [doc.fileName for doc in created_documents[:3]] if len(created_documents) > 3: doc_names.append(f"... and {len(created_documents) - 3} more") - message_text = f"✅ Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} completed\n\nObjective: {task_objective}\n\nGenerated {len(created_documents)} document(s): {', '.join(doc_names)}" + # Enhanced message with workflow context + message_text = f"✅ **Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} Completed**\n\n" + message_text += f"**Objective:** {task_objective}\n\n" + message_text += f"**Generated {len(created_documents)} document(s):** {', '.join(doc_names)}\n\n" + message_text += f"**Result Label:** {result_label}\n" + + # Add comprehensive workflow context + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + total_tasks = workflow_stats.get('totalTasks', 0) + current_action = workflow_context.get('currentAction', 0) + total_actions = workflow_stats.get('totalActions', 0) + + message_text += f"**Workflow Context:**\n" + message_text += f"- Round: {current_round}\n" + if total_tasks > 0: + message_text += f"- Task: {current_task}/{total_tasks}\n" + else: + message_text += f"- Task: {current_task}\n" + if total_actions > 0: + message_text += f"- Action: {current_action}/{total_actions}\n" + else: + message_text += f"- Action: {current_action}\n" + message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}" else: - message_text = f"✅ Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} completed\n\nObjective: {task_objective}\n\nAction executed successfully" + message_text = f"✅ **Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} Completed**\n\n" + message_text += f"**Objective:** {task_objective}\n\n" + message_text += "**Action executed successfully**\n\n" + message_text += f"**Result Label:** {result_label}\n" + + # Add comprehensive workflow context + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + total_tasks = workflow_stats.get('totalTasks', 0) + current_action = workflow_context.get('currentAction', 0) + total_actions = workflow_stats.get('totalActions', 0) + + message_text += f"**Workflow Context:**\n" + message_text += f"- Round: {current_round}\n" + if total_tasks > 0: + message_text += f"- Task: {current_task}/{total_tasks}\n" + else: + message_text += f"- Task: {current_task}\n" + if total_actions > 0: + message_text += f"- Action: {current_action}/{total_actions}\n" + else: + message_text += f"- Action: {current_action}\n" + message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}" else: # ⚠️ FAILURE MESSAGE - Show error details to user error_details = result.error if result.error else "Unknown error occurred" - message_text = f"❌ Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} failed\n\nObjective: {task_objective}\n\nError: {error_details}\n\nPlease check the connection and try again." + message_text = f"❌ **Task {task_index or '?'} - Action {action.execMethod}.{action.execAction} Failed**\n\n" + message_text += f"**Objective:** {task_objective}\n\n" + message_text += f"**Error:** {error_details}\n\n" + message_text += f"**Result Label:** {result_label}\n" + + # Add comprehensive workflow context + current_round = workflow_context.get('currentRound', 1) + current_task = workflow_context.get('currentTask', 0) + total_tasks = workflow_stats.get('totalTasks', 0) + current_action = workflow_context.get('currentAction', 0) + total_actions = workflow_stats.get('totalActions', 0) + + message_text += f"**Workflow Context:**\n" + message_text += f"- Round: {current_round}\n" + if total_tasks > 0: + message_text += f"- Task: {current_task}/{total_tasks}\n" + else: + message_text += f"- Task: {current_task}\n" + if total_actions > 0: + message_text += f"- Action: {current_action}/{total_actions}\n" + else: + message_text += f"- Action: {current_action}\n" + message_text += f"- Status: {workflow_stats.get('workflowStatus', 'unknown')}\n\n" + message_text += "Please check the connection and try again." message_data = { "workflowId": workflow.id, @@ -785,9 +1225,20 @@ class HandlingTasks: "actionMethod": action.execMethod, "actionName": action.execAction, "documentsLabel": result_label, - "documents": created_documents + "documents": created_documents, + # Add workflow context fields + "roundNumber": workflow_context.get('currentRound', 1), + "taskNumber": task_index, + "actionNumber": workflow_context.get('currentAction', 0) } + # Add user-friendly message if available + if action.userMessage: + if result.success: + message_data["message"] += f"\n\n💬 {action.userMessage}" + else: + message_data["message"] += f"\n\n💬 Action was intended to: {action.userMessage}" + # Add debugging for error messages if not result.success: logger.info(f"Creating ERROR message: {message_text}") @@ -884,8 +1335,8 @@ class HandlingTasks: logger.error(f"Action {i} missing required fields: {missing_fields}") return False result_label = action.get('resultLabel', '') - if not result_label.startswith('task'): - logger.error(f"Action {i} result label must start with 'task': {result_label}") + if not result_label.startswith('round'): + logger.error(f"Action {i} result label must start with 'round': {result_label}") return False parameters = action.get('parameters', {}) if not isinstance(parameters, dict): @@ -904,9 +1355,8 @@ class HandlingTasks: # Try to get text content from the first document first_doc = result.documents[0] - if hasattr(first_doc, 'documentData') and isinstance(first_doc.documentData, dict): + if isinstance(first_doc.documentData, dict): return first_doc.documentData.get("result", "") - elif hasattr(first_doc, 'documentData') and isinstance(first_doc.documentData, str): + elif isinstance(first_doc.documentData, str): return first_doc.documentData - else: - return "" \ No newline at end of file + return "" \ No newline at end of file diff --git a/modules/chat/handling/promptFactory.py b/modules/chat/handling/promptFactory.py index 32f35cc4..1bf55c9b 100644 --- a/modules/chat/handling/promptFactory.py +++ b/modules/chat/handling/promptFactory.py @@ -4,19 +4,29 @@ import json import logging from typing import Any, Dict +from modules.interfaces.interfaceChatModel import TaskContext, ReviewContext # Set up logger logger = logging.getLogger(__name__) # Prompt creation helpers extracted from managerChat.py -def createTaskPlanningPrompt(context: Dict[str, Any]) -> str: - """Create prompt for task planning""" - return f"""You are a task planning AI that analyzes user requests and creates structured task plans. +def createTaskPlanningPrompt(context: TaskContext, service) -> str: + """Create enhanced prompt for task planning with user-friendly message generation""" + # Get user language directly from service.user.language + user_language = service.user.language if service and service.user else 'en' + + # Extract user request from context - use Pydantic model directly + user_request = context.task_step.objective if context.task_step else 'No request specified' + + # Extract available documents from context - use Pydantic model directly + available_documents = context.available_documents or [] + + return f"""You are a task planning AI that analyzes user requests and creates structured task plans with user-friendly feedback messages. -USER REQUEST: {context['user_request']} +USER REQUEST: {user_request} -AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])} +AVAILABLE DOCUMENTS: {', '.join(available_documents)} INSTRUCTIONS: 1. Analyze the user request and available documents @@ -24,7 +34,8 @@ INSTRUCTIONS: 3. Focus on business outcomes, not technical operations 4. Each task should produce meaningful, usable outputs 5. Ensure proper handover between tasks using result labels -6. Return a JSON object with the exact structure shown below +6. Generate user-friendly messages for each task in the user's language ({user_language}) +7. Return a JSON object with the exact structure shown below TASK PLANNING PRINCIPLES: - Break down complex requests into logical, sequential steps @@ -32,51 +43,45 @@ TASK PLANNING PRINCIPLES: - Keep tasks at a meaningful level of abstraction - Each task should produce results that can be used by subsequent tasks - Ensure clear dependencies and handovers between tasks +- Provide clear, actionable user messages in the user's language ({user_language}) REQUIRED JSON STRUCTURE: {{ - \"overview\": \"Brief description of the overall plan\", - \"tasks\": [ + "overview": "Brief description of the overall plan", + "userMessage": "User-friendly message explaining the task plan in {user_language}", + "tasks": [ {{ - \"id\": \"task_1\", - \"objective\": \"Clear business objective this task accomplishes\", - \"dependencies\": [\"task_0\"], // IDs of tasks that must complete first - \"success_criteria\": [\"criteria1\", \"criteria2\"], - \"estimated_complexity\": \"low|medium|high\" + "id": "task_1", + "objective": "Clear business objective this task accomplishes", + "dependencies": ["task_0"], // IDs of tasks that must complete first + "success_criteria": ["criteria1", "criteria2"], + "estimated_complexity": "low|medium|high", + "userMessage": "User-friendly message explaining what this task will accomplish in {user_language}" }} ] }} EXAMPLES OF GOOD TASK OBJECTIVES: -- \"Analyze documents and extract key insights for business communication\" -- \"Create professional business communication incorporating analyzed information\" -- \"Execute business communication using specified channels\" -- \"Document and store all business communication outcomes\" +- "Analyze documents and extract key insights for business communication" +- "Create professional business communication incorporating analyzed information" +- "Execute business communication using specified channels" +- "Document and store all business communication outcomes" EXAMPLES OF GOOD SUCCESS CRITERIA: -- \"Key insights extracted and ready for business use\" -- \"Professional communication created with clear business value\" -- \"Business communication successfully delivered\" -- \"All outcomes properly documented and accessible\" +- "Key insights extracted and ready for business use" +- "Professional communication created with clear business value" +- "Business communication successfully delivered" +- "All outcomes properly documented and accessible" EXAMPLES OF BAD TASK OBJECTIVES: -- \"Read the PDF file\" (too granular - should be \"Analyze document content\") -- \"Convert data to CSV\" (implementation detail - should be \"Structure data for analysis\") -- \"Send email\" (too specific - should be \"Deliver business communication\") +- "Read the PDF file" (too granular - should be "Analyze document content") +- "Convert data to CSV" (implementation detail - should be "Structure data for analysis") +- "Send email" (too specific - should be "Deliver business communication") NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" -async def createActionDefinitionPrompt(context, service) -> str: - """Create prompt for action generation with enhanced document extraction guidance and retry context""" - task_step = context.task_step - workflow = context.workflow - available_docs = context.available_documents or [] - previous_results = context.previous_results or [] - improvements = context.improvements or [] - retry_count = context.retry_count or 0 - previous_action_results = context.previous_action_results or [] - previous_review_result = context.previous_review_result - previous_handover = getattr(context, 'previous_handover', None) +async def createActionDefinitionPrompt(context: TaskContext, service) -> str: + """Create enhanced prompt for action generation with user-friendly messages and enhanced document context""" methodList = service.getMethodsList() method_actions = {} for sig in methodList: @@ -84,9 +89,12 @@ async def createActionDefinitionPrompt(context, service) -> str: method, rest = sig.split('.', 1) action = rest.split('(')[0] method_actions.setdefault(method, []).append((action, sig)) - messageSummary = await service.summarizeChat(workflow.messages) - # Get ALL documents from the entire workflow, not just current round - docRefs = service.getDocumentReferenceList() + + messageSummary = await service.summarizeChat(context.workflow.messages) if context.workflow else "" + + # Get enhanced document context using the new method + available_documents_str = service.getEnhancedDocumentContext() + connRefs = service.getConnectionReferenceList() # Debug logging for connections @@ -94,82 +102,60 @@ async def createActionDefinitionPrompt(context, service) -> str: logging.debug(f"Connection references type: {type(connRefs)}") logging.debug(f"Connection references length: {len(connRefs) if connRefs else 0}") - # Get documents from current round (chat) and entire workflow history - current_round_docs = docRefs.get('chat', []) - workflow_history_docs = docRefs.get('history', []) - - # Combine all documents, prioritizing current round first, then workflow history - all_doc_refs = current_round_docs + workflow_history_docs - # Log document availability for debugging - logging.debug(f"Document references - Current round: {len(current_round_docs)}, Workflow history: {len(workflow_history_docs)}, Total: {len(all_doc_refs)}") + logging.debug(f"Enhanced document context length: {len(available_documents_str)}") + available_methods_str = '' for method, actions in method_actions.items(): available_methods_str += f"- {method}:\n" for action, sig in actions: available_methods_str += f" - {action}: {sig}\n" + retry_context = "" - if retry_count > 0: + if context.retry_count and context.retry_count > 0: retry_context = f""" -RETRY CONTEXT (Attempt {retry_count}): +RETRY CONTEXT (Attempt {context.retry_count}): Previous action results that failed or were incomplete: """ - for i, result in enumerate(previous_action_results): + for i, result in enumerate(context.previous_action_results or []): retry_context += f"- Action {i+1}: ActionResult\n" retry_context += f" Status: {result.success and 'success' or 'failed'}\n" retry_context += f" Error: {result.error or 'None'}\n" # Check if result has documents and show document info - if hasattr(result, 'documents') and result.documents: + if result.documents: doc_info = f"Documents: {len(result.documents)} document(s)" if result.documents[0].documentName: doc_info += f" - {result.documents[0].documentName}" retry_context += f" {doc_info}\n" else: retry_context += f" Documents: None\n" - if previous_review_result: + + if context.previous_review_result: retry_context += f""" Previous review feedback: -- Status: {previous_review_result.status or 'unknown'} -- Reason: {previous_review_result.reason or 'No reason provided'} -- Quality Score: {previous_review_result.quality_score or 0}/10 -- Unmet Criteria: {', '.join(previous_review_result.unmet_criteria or [])} +- Status: {context.previous_review_result.get('status', 'unknown') or 'unknown'} +- Reason: {context.previous_review_result.get('reason', 'No reason provided') or 'No reason provided'} +- Quality Score: {context.previous_review_result.get('quality_score', 0) or 0}/10 +- Unmet Criteria: {', '.join(context.previous_review_result.get('unmet_criteria', []) or [])} """ - success_criteria_str = ', '.join(task_step.success_criteria or []) - previous_results_str = ', '.join(previous_results) if previous_results else 'None' - improvements_str = str(improvements) if improvements else 'None' - available_connections_str = '\n'.join(f"- {conn}" for conn in connRefs) - # Build comprehensive document list showing both current round and workflow history - if all_doc_refs: - available_documents_str = "CURRENT ROUND DOCUMENTS:\n" - if current_round_docs: - for doc in current_round_docs: - available_documents_str += f"- {doc.documentsLabel} contains {', '.join(doc.documents)}\n" - else: - available_documents_str += "- No documents in current round\n" - - available_documents_str += "\nWORKFLOW HISTORY DOCUMENTS:\n" - if workflow_history_docs: - for doc in workflow_history_docs: - available_documents_str += f"- {doc.documentsLabel} contains {', '.join(doc.documents)}\n" - else: - available_documents_str += "- No documents in workflow history\n" - else: - available_documents_str = "NO DOCUMENTS AVAILABLE - This workflow has no documents to process." - # Debug logging for document availability - logging.debug(f"Available documents string length: {len(available_documents_str)}") - logging.debug(f"Current round docs count: {len(current_round_docs)}") - logging.debug(f"Workflow history docs count: {len(workflow_history_docs)}") - logging.debug(f"Total doc refs: {len(all_doc_refs)}") + # Use Pydantic model directly - no need for getattr + success_criteria_str = ', '.join(context.task_step.success_criteria) if context.task_step and context.task_step.success_criteria else 'No criteria specified' + previous_results_str = ', '.join(context.previous_results) if context.previous_results else 'None' + improvements_str = str(context.improvements) if context.improvements else 'None' + available_connections_str = '\n'.join(f"- {conn}" for conn in connRefs) + + # Get user language from service - this is the correct way + user_language = service.user.language if service and service.user else 'en' prompt = f""" -You are an action generation AI that creates specific actions to accomplish a task step. +You are an action generation AI that creates specific actions to accomplish a task step with user-friendly messages. DOCUMENT REFERENCE TYPES: -- docItem: Reference to a single document. Format: "docItem::" -- docList: Reference to a group of documents under a label. Format: