From 4e15be8296729fcde3990630da40b9684e4f8329 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Tue, 22 Jul 2025 18:15:02 +0200 Subject: [PATCH] refactored document handling --- modules/chat/documents/documentCreation.py | 124 ----- ...entProcessing.py => documentExtraction.py} | 40 +- modules/chat/documents/documentGeneration.py | 163 ++++++ modules/chat/documents/documentUtility.py | 132 +++++ modules/chat/handling/handlingActions.py | 466 +++++------------- modules/chat/handling/handlingTasks.py | 12 +- modules/chat/serviceCenter.py | 29 +- notes/changelog.txt | 6 - notes/methodbased_specification.md | 4 +- 9 files changed, 483 insertions(+), 493 deletions(-) delete mode 100644 modules/chat/documents/documentCreation.py rename modules/chat/documents/{documentProcessing.py => documentExtraction.py} (96%) create mode 100644 modules/chat/documents/documentGeneration.py create mode 100644 modules/chat/documents/documentUtility.py diff --git a/modules/chat/documents/documentCreation.py b/modules/chat/documents/documentCreation.py deleted file mode 100644 index 49e16580..00000000 --- a/modules/chat/documents/documentCreation.py +++ /dev/null @@ -1,124 +0,0 @@ -# Contains all document creation functions extracted from managerChat.py - -import logging -import json -from typing import Dict, Any, Optional, List, Union -from datetime import datetime, UTC - -class DocumentCreator: - def __init__(self, service): - self.service = service - - def getFileExtension(self, filename: str) -> str: - """Extract file extension from filename""" - return self.service.getFileExtension(filename) - - def getMimeType(self, extension: str) -> str: - """Get MIME type based on file extension""" - return self.service.getMimeTypeFromExtension(extension) - - def detectMimeTypeFromContent(self, content: Any, filename: str) -> str: - """ - Detect MIME type from content and filename using service center. - Only returns a detected MIME type if it's better than application/octet-stream. - """ - try: - if isinstance(content, str): - file_bytes = content.encode('utf-8') - elif isinstance(content, dict): - file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8') - else: - file_bytes = str(content).encode('utf-8') - detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) - if detected_mime_type != "application/octet-stream": - return detected_mime_type - return "application/octet-stream" - except Exception as e: - logging.warning(f"Error in MIME type detection for {filename}: {str(e)}") - return 'application/octet-stream' - - def detectMimeTypeFromDocument(self, document: Any, filename: str) -> str: - """ - Detect MIME type from document object using service center. - Only returns a detected MIME type if it's better than application/octet-stream. - """ - try: - content = getattr(document, 'content', '') - if isinstance(content, str): - file_bytes = content.encode('utf-8') - else: - file_bytes = str(content).encode('utf-8') - detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename) - if detected_mime_type != "application/octet-stream": - return detected_mime_type - return "application/octet-stream" - except Exception as e: - logging.warning(f"Error in MIME type detection for document {filename}: {str(e)}") - return 'application/octet-stream' - - def convertDocumentDataToString(self, document_data: Dict[str, Any], file_extension: str) -> str: - """Convert document data to string content based on file type with enhanced processing""" - try: - if document_data is None: - return "" - if isinstance(document_data, str): - return document_data - if isinstance(document_data, dict): - if file_extension == 'json': - return json.dumps(document_data, indent=2, ensure_ascii=False) - elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']: - text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data'] - for field in text_fields: - if field in document_data: - content = document_data[field] - if isinstance(content, str): - return content - elif isinstance(content, (dict, list)): - return json.dumps(content, indent=2, ensure_ascii=False) - return json.dumps(document_data, indent=2, ensure_ascii=False) - elif file_extension == 'csv': - csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text'] - for field in csv_fields: - if field in document_data: - content = document_data[field] - if isinstance(content, str): - return content - elif isinstance(content, list): - if content and isinstance(content[0], (list, dict)): - import csv - import io - output = io.StringIO() - if isinstance(content[0], dict): - if content: - fieldnames = content[0].keys() - writer = csv.DictWriter(output, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(content) - else: - writer = csv.writer(output) - writer.writerows(content) - return output.getvalue() - return json.dumps(document_data, indent=2, ensure_ascii=False) - else: - return json.dumps(document_data, indent=2, ensure_ascii=False) - elif isinstance(document_data, list): - if file_extension == 'csv': - import csv - import io - output = io.StringIO() - if document_data and isinstance(document_data[0], dict): - fieldnames = document_data[0].keys() - writer = csv.DictWriter(output, fieldnames=fieldnames) - writer.writeheader() - writer.writerows(document_data) - else: - writer = csv.writer(output) - writer.writerows(document_data) - return output.getvalue() - else: - return json.dumps(document_data, indent=2, ensure_ascii=False) - else: - return str(document_data) - except Exception as e: - logging.error(f"Error converting document data to string: {str(e)}") - return str(document_data) \ No newline at end of file diff --git a/modules/chat/documents/documentProcessing.py b/modules/chat/documents/documentExtraction.py similarity index 96% rename from modules/chat/documents/documentProcessing.py rename to modules/chat/documents/documentExtraction.py index 323c6f7f..8bd1a563 100644 --- a/modules/chat/documents/documentProcessing.py +++ b/modules/chat/documents/documentExtraction.py @@ -9,6 +9,13 @@ from pathlib import Path import xml.etree.ElementTree as ET from bs4 import BeautifulSoup import uuid +from .documentUtility import ( + getFileExtension, + getMimeTypeFromExtension, + detectMimeTypeFromContent, + detectMimeTypeFromData, + convertDocumentDataToString +) from modules.interfaces.interfaceChatModel import ( ExtractedContent, @@ -29,7 +36,7 @@ class FileProcessingError(Exception): """Custom exception for file processing errors.""" pass -class DocumentProcessor: +class DocumentExtraction: """Processor for handling document operations and content extraction.""" def __init__(self, serviceCenter=None): @@ -133,17 +140,13 @@ class DocumentProcessor: # Decode base64 if needed if base64Encoded: fileData = base64.b64decode(fileData) - - # Detect content type if needed + # Use documentUtility for mime type detection if mimeType == "application/octet-stream": - mimeType = self._serviceCenter.detectContentTypeFromData(fileData, filename) - + mimeType = detectMimeTypeFromData(fileData, filename, self._serviceCenter) # Process document based on type if mimeType not in self.supportedTypes: - # Fallback to binary processing contentItems = await self._processBinary(fileData, filename, mimeType) else: - # Process document based on type processor = self.supportedTypes[mimeType] contentItems = await processor(fileData, filename, mimeType) @@ -171,13 +174,15 @@ class DocumentProcessor: """Process text document""" try: content = fileData.decode('utf-8') + # Use documentUtility for mime type + mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, - mimeType="text/plain", + mimeType=mime_type, base64Encoded=False ) )] @@ -189,13 +194,14 @@ class DocumentProcessor: """Process CSV document""" try: content = fileData.decode('utf-8') + mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, - mimeType="text/csv", + mimeType=mime_type, base64Encoded=False ) )] @@ -207,16 +213,15 @@ class DocumentProcessor: """Process JSON document""" try: content = fileData.decode('utf-8') - # Parse JSON to validate jsonData = json.loads(content) - + mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, - mimeType="application/json", + mimeType=mime_type, base64Encoded=False ) )] @@ -228,13 +233,14 @@ class DocumentProcessor: """Process XML document""" try: content = fileData.decode('utf-8') + mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, - mimeType="application/xml", + mimeType=mime_type, base64Encoded=False ) )] @@ -246,13 +252,14 @@ class DocumentProcessor: """Process HTML document""" try: content = fileData.decode('utf-8') + mime_type = getMimeTypeFromExtension(getFileExtension(filename), self._serviceCenter) return [ContentItem( label="main", data=content, metadata=ContentMetadata( size=len(content.encode('utf-8')), pages=1, - mimeType="text/html", + mimeType=mime_type, base64Encoded=False ) )] @@ -264,15 +271,14 @@ class DocumentProcessor: """Process SVG document""" try: content = fileData.decode('utf-8') - # Check if it's actually SVG isSvg = " List[Dict[str, Any]]: + """ + Main function to process documents from an action result. + Returns a list of processed document dictionaries. + """ + try: + documents = action_result.data.get("documents", []) + processed_documents = [] + for doc in documents: + processed_doc = self.processSingleDocument(doc, action) + if processed_doc: + processed_documents.append(processed_doc) + return processed_documents + except Exception as e: + logger.error(f"Error processing action result documents: {str(e)}") + return [] + + def processSingleDocument(self, doc: Any, action) -> Optional[Dict[str, Any]]: + """Process a single document from action result""" + try: + if hasattr(doc, 'filename') and doc.filename: + # Document object with filename attribute + mime_type = getattr(doc, 'mimeType', 'application/octet-stream') + if mime_type == "application/octet-stream": + content = getattr(doc, 'content', '') + mime_type = detectMimeTypeFromContent(content, doc.filename, self.service) + return { + 'filename': doc.filename, + 'fileSize': getattr(doc, 'fileSize', 0), + 'mimeType': mime_type, + 'content': getattr(doc, 'content', ''), + 'document': doc + } + elif isinstance(doc, dict): + # Dictionary format document + filename = doc.get('documentName', doc.get('filename', \ + f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}")) + fileSize = doc.get('fileSize', len(str(doc.get('documentData', '')))) + mimeType = doc.get('mimeType', 'application/octet-stream') + if mimeType == "application/octet-stream": + document_data = doc.get('documentData', '') + mimeType = detectMimeTypeFromContent(document_data, filename, self.service) + return { + 'filename': filename, + 'fileSize': fileSize, + 'mimeType': mimeType, + 'content': doc.get('documentData', ''), + 'document': doc + } + else: + # Unknown document type + logger.warning(f"Unknown document type for action {action.execMethod}.{action.execAction}: {type(doc)}") + filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}" + mimeType = detectMimeTypeFromContent(doc, filename, self.service) + return { + 'filename': filename, + 'fileSize': 0, + 'mimeType': mimeType, + 'content': str(doc), + 'document': doc + } + except Exception as e: + logger.error(f"Error processing single document: {str(e)}") + return None + + def createDocumentsFromActionResult(self, action_result, action, workflow) -> List[Any]: + """ + Create actual document objects from action result and store them in the system. + Returns a list of created document objects. + """ + try: + processed_docs = self.processActionResultDocuments(action_result, action, workflow) + created_documents = [] + for doc_data in processed_docs: + try: + document_name = doc_data['filename'] + document_data = doc_data['content'] + mime_type = doc_data['mimeType'] + # Convert document data to string content + content = convertDocumentDataToString(document_data, getFileExtension(document_name)) + # Skip empty or minimal content + minimal_content_patterns = ['{}', '[]', 'null', '""', "''"] + if not content or content.strip() == "" or content.strip() in minimal_content_patterns: + logger.warning(f"Empty or minimal content for document {document_name}, skipping") + continue + # Create file in system + file_id = self.service.createFile( + fileName=document_name, + mimeType=mime_type, + content=content, + base64encoded=False + ) + if not file_id: + logger.error(f"Failed to create file for document {document_name}") + continue + # Create document object + document = self.service.createDocument( + fileName=document_name, + mimeType=mime_type, + content=content, + base64encoded=False + ) + if document: + created_documents.append(document) + logger.info(f"Created document: {document_name} with file ID: {file_id} and MIME type: {mime_type}") + else: + logger.error(f"Failed to create ChatDocument object for {document_name}") + except Exception as e: + logger.error(f"Error creating document {doc_data.get('filename', 'unknown')}: {str(e)}") + continue + return created_documents + except Exception as e: + logger.error(f"Error creating documents from action result: {str(e)}") + return [] + + @staticmethod + def get_delivered_files_and_formats(documents): + delivered_files = [] + delivered_formats = [] + for doc in documents: + if hasattr(doc, 'filename'): + delivered_files.append(doc.filename) + file_extension = getFileExtension(doc.filename) + mime_type = getattr(doc, 'mimeType', 'application/octet-stream') + delivered_formats.append({ + 'filename': doc.filename, + 'extension': file_extension, + 'mimeType': mime_type + }) + elif isinstance(doc, dict) and 'filename' in doc: + delivered_files.append(doc['filename']) + file_extension = getFileExtension(doc['filename']) + mime_type = doc.get('mimeType', 'application/octet-stream') + delivered_formats.append({ + 'filename': doc['filename'], + 'extension': file_extension, + 'mimeType': mime_type + }) + else: + delivered_files.append(f"document_{len(delivered_files)}") + delivered_formats.append({ + 'filename': f"document_{len(delivered_files)}", + 'extension': 'unknown', + 'mimeType': 'application/octet-stream' + }) + return delivered_files, delivered_formats \ No newline at end of file diff --git a/modules/chat/documents/documentUtility.py b/modules/chat/documents/documentUtility.py new file mode 100644 index 00000000..1b74b475 --- /dev/null +++ b/modules/chat/documents/documentUtility.py @@ -0,0 +1,132 @@ +import json +import logging +from typing import Any, Dict + +logger = logging.getLogger(__name__) + +def getFileExtension(filename: str) -> str: + """Extract file extension from filename""" + if '.' in filename: + return filename.rsplit('.', 1)[-1].lower() + return '' + +def getMimeTypeFromExtension(extension: str, service=None) -> str: + """Get MIME type based on file extension. Optionally use a service for mapping.""" + if service: + return service.getMimeTypeFromExtension(extension) + # Fallback mapping + mapping = { + 'txt': 'text/plain', + 'md': 'text/markdown', + 'html': 'text/html', + 'css': 'text/css', + 'js': 'application/javascript', + 'json': 'application/json', + 'csv': 'text/csv', + 'xml': 'application/xml', + 'py': 'text/x-python', + 'pdf': 'application/pdf', + 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + 'png': 'image/png', + 'jpg': 'image/jpeg', + 'jpeg': 'image/jpeg', + 'gif': 'image/gif', + 'svg': 'image/svg+xml', + } + return mapping.get(extension.lower(), 'application/octet-stream') + +def detectMimeTypeFromData(file_bytes: bytes, filename: str, service=None) -> str: + """Detect MIME type from file bytes and filename using a service if provided.""" + try: + if service: + detected = service.detectContentTypeFromData(file_bytes, filename) + if detected and detected != 'application/octet-stream': + return detected + # Fallback: guess from extension + ext = getFileExtension(filename) + return getMimeTypeFromExtension(ext, service) + except Exception as e: + logger.warning(f"Error in MIME type detection for {filename}: {str(e)}") + return 'application/octet-stream' + +def detectMimeTypeFromContent(content: Any, filename: str, service=None) -> str: + """Detect MIME type from content and filename using a service if provided.""" + try: + if isinstance(content, str): + file_bytes = content.encode('utf-8') + elif isinstance(content, dict): + file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8') + else: + file_bytes = str(content).encode('utf-8') + return detectMimeTypeFromData(file_bytes, filename, service) + except Exception as e: + logger.warning(f"Error in MIME type detection for {filename}: {str(e)}") + return 'application/octet-stream' + +def convertDocumentDataToString(document_data: Any, file_extension: str) -> str: + """Convert document data to string content based on file type with enhanced processing.""" + try: + if document_data is None: + return "" + if isinstance(document_data, str): + return document_data + if isinstance(document_data, dict): + if file_extension == 'json': + return json.dumps(document_data, indent=2, ensure_ascii=False) + elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']: + text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data'] + for field in text_fields: + if field in document_data: + content = document_data[field] + if isinstance(content, str): + return content + elif isinstance(content, (dict, list)): + return json.dumps(content, indent=2, ensure_ascii=False) + return json.dumps(document_data, indent=2, ensure_ascii=False) + elif file_extension == 'csv': + csv_fields = ['table_data', 'csv_data', 'rows', 'data', 'content', 'text'] + for field in csv_fields: + if field in document_data: + content = document_data[field] + if isinstance(content, str): + return content + elif isinstance(content, list): + if content and isinstance(content[0], (list, dict)): + import csv + import io + output = io.StringIO() + if isinstance(content[0], dict): + if content: + fieldnames = content[0].keys() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(content) + else: + writer = csv.writer(output) + writer.writerows(content) + return output.getvalue() + return json.dumps(document_data, indent=2, ensure_ascii=False) + else: + return json.dumps(document_data, indent=2, ensure_ascii=False) + elif isinstance(document_data, list): + if file_extension == 'csv': + import csv + import io + output = io.StringIO() + if document_data and isinstance(document_data[0], dict): + fieldnames = document_data[0].keys() + writer = csv.DictWriter(output, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(document_data) + else: + writer = csv.writer(output) + writer.writerows(document_data) + return output.getvalue() + else: + return json.dumps(document_data, indent=2, ensure_ascii=False) + else: + return str(document_data) + except Exception as e: + logger.error(f"Error converting document data to string: {str(e)}") + return str(document_data) \ No newline at end of file diff --git a/modules/chat/handling/handlingActions.py b/modules/chat/handling/handlingActions.py index 9d897703..7723ae25 100644 --- a/modules/chat/handling/handlingActions.py +++ b/modules/chat/handling/handlingActions.py @@ -7,8 +7,8 @@ import time from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC from modules.interfaces.interfaceChatModel import ReviewResult, ActionResult -from modules.chat.documents.documentCreation import DocumentCreator from .promptFactory import createResultReviewPrompt +from modules.chat.documents.documentGeneration import DocumentGenerator logger = logging.getLogger(__name__) @@ -16,12 +16,72 @@ class HandlingActions: def __init__(self, service, chatInterface): self.service = service self.chatInterface = chatInterface - self.documentCreator = DocumentCreator(self.service) + self.documentGenerator = DocumentGenerator(service) + + async def executeSingleAction(self, action, workflow): + """Execute a single action and return ActionResult with enhanced document processing""" + try: + enhanced_parameters = action.execParameters.copy() + if action.expectedDocumentFormats: + enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats + logger.info(f"Action {action.execMethod}.{action.execAction} expects formats: {action.expectedDocumentFormats}") + result = await self.service.executeAction( + methodName=action.execMethod, + actionName=action.execAction, + parameters=enhanced_parameters + ) + result_label = action.execResultLabel + if result.success: + action.setSuccess() + action.result = result.data.get("result", "") + action.execResultLabel = result_label + await self.createActionMessage(action, result, workflow, result_label) + else: + action.setError(result.error or "Action execution failed") + processed_documents = self.documentGenerator.processActionResultDocuments(result, action, workflow) + return ActionResult( + success=result.success, + data={ + "result": result.data.get("result", ""), + "documents": processed_documents, + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "resultLabel": result_label + }, + metadata={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "resultLabel": result_label + }, + validation=[], + error=result.error or "" + ) + except Exception as e: + logger.error(f"Error executing single action: {str(e)}") + action.setError(str(e)) + return ActionResult( + success=False, + data={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documents": [] + }, + metadata={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction + }, + validation=[], + error=str(e) + ) async def validateActionResult(self, action_result, action, context) -> dict: try: prompt = self._createGenericValidationPrompt(action_result, action, context) - response = await self._callAIWithCircuitBreaker(prompt, "action_validation") + response = await self.service.callAiTextAdvanced(prompt, "action_validation") validation = self._parseValidationResponse(response) validation['action_id'] = action.id validation['action_method'] = action.execMethod @@ -41,6 +101,73 @@ class HandlingActions: 'result_label': action.execResultLabel } + async def createActionMessage(self, action, result, workflow, result_label=None): + """Create and store a message for the action result in the workflow with enhanced document processing""" + try: + if result_label is None: + result_label = action.execResultLabel + message_data = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"Executed action {action.execMethod}.{action.execAction}", + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": datetime.now(UTC).isoformat(), + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documentsLabel": result_label, + "documents": [] + } + # Use the local createDocumentsFromActionResult method + created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow) + message_data["documents"] = created_documents + message = self.chatInterface.createWorkflowMessage(message_data) + if message: + workflow.messages.append(message) + logger.info(f"Created action message for {action.execMethod}.{action.execAction} with {len(created_documents)} documents") + logger.debug(f"WORKFLOW STATE after createActionMessage: id={id(workflow)}, message_count={len(workflow.messages)}") + for idx, msg in enumerate(workflow.messages): + label = getattr(msg, 'documentsLabel', None) + docs = getattr(msg, 'documents', None) + logger.debug(f" Message {idx}: label='{label}', documents_count={len(docs) if docs else 0}") + else: + logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}") + except Exception as e: + logger.error(f"Error creating action message: {str(e)}") + + def parseActionResponse(self, response: str) -> list: + try: + json_start = response.find('{') + json_end = response.rfind('}') + 1 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON found in response") + json_str = response[json_start:json_end] + action_data = json.loads(json_str) + if 'actions' not in action_data: + raise ValueError("Action response missing 'actions' field") + return action_data['actions'] + except Exception as e: + logger.error(f"Error parsing action response: {str(e)}") + return [] + + def parseReviewResponse(self, response: str) -> dict: + try: + json_start = response.find('{') + json_end = response.rfind('}') + 1 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON found in response") + json_str = response[json_start:json_end] + review = json.loads(json_str) + if 'status' not in review: + raise ValueError("Review response missing 'status' field") + return review + except Exception as e: + logger.error(f"Error parsing review response: {str(e)}") + return {'status': 'failed', 'reason': f'Parse error: {str(e)}'} + + # Internal helper methods + def _createGenericValidationPrompt(self, action_result, action, context) -> str: success = action_result.success result_data = action_result.data @@ -54,35 +181,9 @@ class HandlingActions: expected_document_formats = action.expectedDocumentFormats or [] actual_result_label = result_data.get("resultLabel", "") if isinstance(result_data, dict) else "" result_label_match = actual_result_label == expected_result_label - delivered_files = [] - delivered_formats = [] + # Use DocumentGenerator for file/format extraction + delivered_files, delivered_formats = DocumentGenerator.get_delivered_files_and_formats(documents) content_items = [] - for doc in documents: - if hasattr(doc, 'filename'): - delivered_files.append(doc.filename) - file_extension = self._getFileExtension(doc.filename) - mime_type = getattr(doc, 'mimeType', 'application/octet-stream') - delivered_formats.append({ - 'filename': doc.filename, - 'extension': file_extension, - 'mimeType': mime_type - }) - elif isinstance(doc, dict) and 'filename' in doc: - delivered_files.append(doc['filename']) - file_extension = self._getFileExtension(doc['filename']) - mime_type = doc.get('mimeType', 'application/octet-stream') - delivered_formats.append({ - 'filename': doc['filename'], - 'extension': file_extension, - 'mimeType': mime_type - }) - else: - delivered_files.append(f"document_{len(delivered_files)}") - delivered_formats.append({ - 'filename': f"document_{len(delivered_files)}", - 'extension': 'unknown', - 'mimeType': 'application/octet-stream' - }) if isinstance(result_data, dict): if 'extractedContent' in result_data: extracted_content = result_data['extractedContent'] @@ -128,305 +229,4 @@ class HandlingActions: 'quality_score': 5, 'missing_elements': [], 'suggested_retry_approach': '' - } - - async def executeSingleAction(self, action, workflow): - """Execute a single action and return ActionResult with enhanced document processing""" - try: - # Use DocumentCreator methods - # Enhance parameters with expected document formats if specified - enhanced_parameters = action.execParameters.copy() - if action.expectedDocumentFormats: - enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats - logger.info(f"Action {action.execMethod}.{action.execAction} expects formats: {action.expectedDocumentFormats}") - result = await self.service.executeAction( - methodName=action.execMethod, - actionName=action.execAction, - parameters=enhanced_parameters - ) - result_label = action.execResultLabel - if result.success: - action.setSuccess() - action.result = result.data.get("result", "") - action.execResultLabel = result_label - await self.createActionMessage(action, result, workflow, result_label) - else: - action.setError(result.error or "Action execution failed") - documents = result.data.get("documents", []) - processed_documents = [] - for doc in documents: - if hasattr(doc, 'filename') and doc.filename: - mime_type = getattr(doc, 'mimeType', 'application/octet-stream') - if mime_type == "application/octet-stream": - mime_type = self.documentCreator.detectMimeTypeFromDocument(doc, doc.filename) - processed_documents.append({ - 'filename': doc.filename, - 'fileSize': getattr(doc, 'fileSize', 0), - 'mimeType': mime_type, - 'content': getattr(doc, 'content', ''), - 'document': doc - }) - elif isinstance(doc, dict): - filename = doc.get('documentName', doc.get('filename', f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}")) - fileSize = doc.get('fileSize', len(str(doc.get('documentData', '')))) - mimeType = doc.get('mimeType', 'application/octet-stream') - if mimeType == "application/octet-stream": - document_data = doc.get('documentData', '') - mimeType = self.documentCreator.detectMimeTypeFromContent(document_data, filename) - processed_documents.append({ - 'filename': filename, - 'fileSize': fileSize, - 'mimeType': mimeType, - 'content': doc.get('documentData', ''), - 'document': doc - }) - else: - logger.warning(f"Unknown document type for action {action.execMethod}.{action.execAction}: {type(doc)}") - filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}" - mimeType = 'application/octet-stream' - mimeType = self.documentCreator.detectMimeTypeFromContent(doc, filename) - processed_documents.append({ - 'filename': filename, - 'fileSize': 0, - 'mimeType': mimeType, - 'content': str(doc), - 'document': doc - }) - return ActionResult( - success=result.success, - data={ - "result": result.data.get("result", ""), - "documents": processed_documents, - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "resultLabel": result_label - }, - metadata={ - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "resultLabel": result_label - }, - validation=[], - error=result.error or "" - ) - except Exception as e: - logger.error(f"Error executing single action: {str(e)}") - action.setError(str(e)) - return ActionResult( - success=False, - data={ - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "documents": [] - }, - metadata={ - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction - }, - validation=[], - error=str(e) - ) - - async def createActionMessage(self, action, result, workflow, result_label=None): - """Create and store a message for the action result in the workflow with enhanced document processing""" - try: - # Use DocumentCreator methods - result_data = result.data if hasattr(result, 'data') else {} - documents_data = result_data.get("documents", []) - if result_label is None: - result_label = action.execResultLabel - message_data = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Executed action {action.execMethod}.{action.execAction}", - "status": "step", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": datetime.now(UTC).isoformat(), - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "documentsLabel": result_label, # Use intent label from action definition - "documents": [] - } - if documents_data: - processed_documents = [] - for doc_data in documents_data: - try: - if isinstance(doc_data, dict): - document_name = doc_data.get("documentName", doc_data.get("filename", f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}")) - document_data = doc_data.get("documentData", {}) - file_size = doc_data.get("fileSize", 0) - mime_type = doc_data.get("mimeType", "application/octet-stream") - elif hasattr(doc_data, 'filename'): - document_name = doc_data.filename - document_data = getattr(doc_data, 'content', {}) - file_size = getattr(doc_data, 'fileSize', 0) - mime_type = getattr(doc_data, 'mimeType', "application/octet-stream") - else: - document_name = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}" - document_data = doc_data - file_size = len(str(doc_data)) - mime_type = "application/octet-stream" - if mime_type == "application/octet-stream": - mime_type = self.documentCreator.detectMimeTypeFromContent(document_data, document_name) - content = self.documentCreator.convertDocumentDataToString(document_data, self.documentCreator.getFileExtension(document_name)) - minimal_content_patterns = ['{}', '[]', 'null', '""', "''"] - if not content or content.strip() == "" or content.strip() in minimal_content_patterns: - logger.warning(f"Empty or minimal content for document {document_name}, skipping") - continue - file_id = self.service.createFile( - fileName=document_name, - mimeType=mime_type, - content=content, - base64encoded=False - ) - if not file_id: - logger.error(f"Failed to create file for document {document_name}") - continue - document = self.service.createDocument( - fileName=document_name, - mimeType=mime_type, - content=content, - base64encoded=False - ) - if document: - processed_documents.append(document) - logger.info(f"Created document: {document_name} with file ID: {file_id} and MIME type: {mime_type}") - else: - logger.error(f"Failed to create ChatDocument object for {document_name}") - except Exception as e: - logger.error(f"Error processing document {getattr(doc_data, 'documentName', 'unknown') if isinstance(doc_data, dict) else 'unknown'}: {str(e)}") - continue - message_data["documents"] = processed_documents - message = self.chatInterface.createWorkflowMessage(message_data) - if message: - workflow.messages.append(message) - logger.info(f"Created action message for {action.execMethod}.{action.execAction} with {len(message_data.get('documents', []))} documents") - logger.debug(f"WORKFLOW STATE after createActionMessage: id={id(workflow)}, message_count={len(workflow.messages)}") - for idx, msg in enumerate(workflow.messages): - label = getattr(msg, 'documentsLabel', None) - docs = getattr(msg, 'documents', None) - logger.debug(f" Message {idx}: label='{label}', documents_count={len(docs) if docs else 0}") - else: - logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}") - except Exception as e: - logger.error(f"Error creating action message: {str(e)}") - - async def performTaskReview(self, review_context) -> 'ReviewResult': - """Perform AI-based task review with enhanced retry logic""" - try: - # Prepare prompt for result review - prompt = await createResultReviewPrompt(self, review_context) - - # Call AI with circuit breaker - response = await self._callAIWithCircuitBreaker(prompt, "result_review") - - # Parse review result - review_dict = self._parseReviewResponse(response) - - # Add default values for missing fields - review_dict.setdefault('status', 'unknown') - review_dict.setdefault('reason', 'No reason provided') - review_dict.setdefault('quality_score', 5) - - # Enhanced retry logic based on result quality - if review_dict.get('status') == 'retry': - # Analyze the specific issues for better retry guidance - action_results = review_context.action_results or [] - if action_results: - # Check for common issues that warrant retry - # Only consider empty results a problem if there are no documents produced - has_empty_results = any( - not result.data.get('result', '').strip() and - not result.data.get('documents') and - not result.data.get('documents') - for result in action_results - if result.success - ) - - has_incomplete_metadata = any( - any(doc.get('filename') == 'unknown' for doc in result.data.get('documents', []) or []) - for result in action_results - if result.success - ) - - if has_empty_results: - review_dict['improvements'] = (review_dict.get('improvements', '') + - " Ensure the document extraction returns actual content, not empty results. " + - "Check if the AI prompt is specific enough to extract meaningful data.") - - if has_incomplete_metadata: - review_dict['improvements'] = (review_dict.get('improvements', '') + - " Ensure proper document metadata is extracted including filename, size, and mime type. " + - "The document processing should provide complete file information.") - - # If we have specific issues, adjust quality score - if has_empty_results or has_incomplete_metadata: - review_dict['quality_score'] = max(1, review_dict.get('quality_score', 5) - 2) - - # Create ReviewResult model - return ReviewResult( - status=review_dict.get('status', 'unknown'), - reason=review_dict.get('reason', 'No reason provided'), - improvements=review_dict.get('improvements', []), - quality_score=review_dict.get('quality_score', 5), - missing_outputs=review_dict.get('missing_outputs', []), - met_criteria=review_dict.get('met_criteria', []), - unmet_criteria=review_dict.get('unmet_criteria', []), - confidence=review_dict.get('confidence', 0.5) - ) - - except Exception as e: - logger.error(f"Error performing task review: {str(e)}") - return ReviewResult( - status='success', # Default to success to avoid blocking workflow - reason=f'Review failed: {str(e)}', - quality_score=5, - confidence=0.5 - ) - - def parseActionResponse(self, response: str) -> list: - """Parse AI response into action list""" - try: - json_start = response.find('{') - json_end = response.rfind('}') + 1 - if json_start == -1 or json_end == 0: - raise ValueError("No JSON found in response") - json_str = response[json_start:json_end] - action_data = json.loads(json_str) - if 'actions' not in action_data: - raise ValueError("Action response missing 'actions' field") - return action_data['actions'] - except Exception as e: - logger.error(f"Error parsing action response: {str(e)}") - return [] - - def parseReviewResponse(self, response: str) -> dict: - """Parse AI response into review result""" - try: - json_start = response.find('{') - json_end = response.rfind('}') + 1 - if json_start == -1 or json_end == 0: - raise ValueError("No JSON found in response") - json_str = response[json_start:json_end] - review = json.loads(json_str) - if 'status' not in review: - raise ValueError("Review response missing 'status' field") - return review - except Exception as e: - logger.error(f"Error parsing review response: {str(e)}") - return {'status': 'failed', 'reason': f'Parse error: {str(e)}'} - - # Utility method for file extension - def _getFileExtension(self, filename): - if '.' in filename: - return filename.rsplit('.', 1)[-1].lower() - return '' - - # Placeholder methods for AI and prompt logic (to be implemented or injected) - async def _callAIWithCircuitBreaker(self, prompt, purpose): - raise NotImplementedError("_callAIWithCircuitBreaker must be implemented in the subclass or injected.") \ No newline at end of file + } diff --git a/modules/chat/handling/handlingTasks.py b/modules/chat/handling/handlingTasks.py index f154a085..bb7443c9 100644 --- a/modules/chat/handling/handlingTasks.py +++ b/modules/chat/handling/handlingTasks.py @@ -30,7 +30,7 @@ class HandlingTasks: prompt = await self.service.callAiTextAdvanced( createTaskPlanningPrompt(self, { 'user_request': userInput, - 'available_documents': self._getAvailableDocuments(workflow), + 'available_documents': self.service.getAvailableDocuments(workflow), 'workflow_id': workflow.id }) ) @@ -55,7 +55,7 @@ class HandlingTasks: task_step=task_step, workflow=workflow, workflow_id=workflow.id, - available_documents=self._getAvailableDocuments(workflow), + available_documents=self.service.getAvailableDocuments(workflow), previous_results=previous_results or [], improvements=[], retry_count=0, @@ -205,13 +205,7 @@ class HandlingTasks: return {'error': str(e)} # --- Helper and validation methods (unchanged, but can be inlined or made private) --- - def _getAvailableDocuments(self, workflow): - documents = [] - for message in workflow.messages: - for doc in message.documents: - documents.append(doc.filename) - return documents - + def _parseTaskPlanResponse(self, response: str) -> dict: try: json_start = response.find('{') diff --git a/modules/chat/serviceCenter.py b/modules/chat/serviceCenter.py index 6099408d..8a0d5df6 100644 --- a/modules/chat/serviceCenter.py +++ b/modules/chat/serviceCenter.py @@ -13,7 +13,7 @@ from modules.interfaces.interfaceChatObjects import getInterface as getChatObjec from modules.interfaces.interfaceChatModel import ActionResult from modules.interfaces.interfaceComponentObjects import getInterface as getComponentObjects from modules.interfaces.interfaceAppObjects import getInterface as getAppObjects -from gateway.modules.chat.documents.documentProcessing import DocumentProcessor +from modules.chat.documents.documentExtraction import DocumentExtraction from modules.chat.methodBase import MethodBase import uuid @@ -37,7 +37,7 @@ class ServiceCenter: self.interfaceComponent = getComponentObjects(currentUser) self.interfaceApp = getAppObjects(currentUser) self.interfaceAiCalls = AiCalls() - self.documentProcessor = DocumentProcessor(self) + self.documentProcessor = DocumentExtraction(self) # Initialize methods catalog self.methods = {} @@ -259,6 +259,15 @@ class ServiceCenter: return filename.split('.')[-1].lower() return "txt" # Default to text + def getFileExtension(self, filename): + """ + Extract file extension from filename (without dot, lowercased). + Returns empty string if no extension is found. + """ + if '.' in filename: + return filename.rsplit('.', 1)[-1].lower() + return '' + # ===== Functions ===== def extractContent(self, prompt: str, document: ChatDocument) -> ExtractedContent: @@ -859,6 +868,22 @@ Please provide a clear summary of this message.""" logger.error(f"Error calculating user input size: {str(e)}") return 0 + def getAvailableDocuments(self, workflow) -> List[str]: + """ + Get list of available document filenames from workflow. + + Args: + workflow: ChatWorkflow object + + Returns: + List[str]: List of document filenames + """ + documents = [] + for message in workflow.messages: + for doc in message.documents: + documents.append(doc.filename) + return documents + async def executeAction(self, methodName: str, actionName: str, parameters: Dict[str, Any]) -> ActionResult: """Execute a method action""" try: diff --git a/notes/changelog.txt b/notes/changelog.txt index d566a796..4d63775e 100644 --- a/notes/changelog.txt +++ b/notes/changelog.txt @@ -1,11 +1,5 @@ TODO -- refactory of chat manager - - - - to put document modules into documents--> creation, extraction -> adapt references over global search - - - neutralizer to activate AND put back placeholders to the returned data - referenceHandling and authentication for connections in the method actions - check methods diff --git a/notes/methodbased_specification.md b/notes/methodbased_specification.md index 22c5ac89..6aa273c7 100644 --- a/notes/methodbased_specification.md +++ b/notes/methodbased_specification.md @@ -177,7 +177,7 @@ class ServiceCenter: self.tasks: Dict[str, AgentTask] = {} self.promptManager = AIPromptManager() self.taskStateManager = TaskStateManager() - self.documentProcessor = DocumentProcessor() + self.documentProcessor = DocumentExtraction() async def execute_task(self, task: AgentTask) -> None: """Execute task with improved error handling and timeout""" @@ -304,7 +304,7 @@ class DocumentContext(BaseModel): relevantSections: List[str] processingStatus: Dict[str, str] -class DocumentProcessor: +class DocumentExtraction: """Processes documents with context awareness""" def process_with_context(self, doc: Dict, context: DocumentContext) -> Dict: