From 12fd5e0a414bb6721ca2c3c9dbea3e67ea64f497 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Tue, 29 Jul 2025 11:39:02 +0200 Subject: [PATCH] clean flow --- ...nagerChat.py => OLD_BACKUP managerChat.py} | 0 modules/chat/documents/documentExtraction.py | 50 ++-- modules/chat/documents/documentGeneration.py | 37 +-- modules/chat/handling/executionState.py | 42 ++-- modules/chat/handling/handlingActions.py | 202 --------------- modules/chat/handling/handlingTasks.py | 234 +++++++++++++++--- modules/chat/managerChat.py | 63 +++-- modules/chat/methodBase.py | 6 +- modules/interfaces/interfaceChatModel.py | 89 +++++-- modules/interfaces/interfaceChatObjects.py | 2 +- modules/methods/methodDocument.py | 6 +- modules/workflow/managerWorkflow.py | 4 +- 12 files changed, 383 insertions(+), 352 deletions(-) rename modules/chat/{BACKUP managerChat.py => OLD_BACKUP managerChat.py} (100%) delete mode 100644 modules/chat/handling/handlingActions.py diff --git a/modules/chat/BACKUP managerChat.py b/modules/chat/OLD_BACKUP managerChat.py similarity index 100% rename from modules/chat/BACKUP managerChat.py rename to modules/chat/OLD_BACKUP managerChat.py diff --git a/modules/chat/documents/documentExtraction.py b/modules/chat/documents/documentExtraction.py index 8bd1a563..41588a62 100644 --- a/modules/chat/documents/documentExtraction.py +++ b/modules/chat/documents/documentExtraction.py @@ -88,7 +88,7 @@ class DocumentExtraction: import PyPDF2 import fitz # PyMuPDF for more extensive PDF processing pdfExtractorLoaded = True - logger.info("PDF extraction libraries successfully loaded") + logger.debug("📄 PDF extraction libraries successfully loaded") except ImportError as e: logger.warning(f"PDF extraction libraries could not be loaded: {e}") @@ -101,7 +101,7 @@ class DocumentExtraction: import docx # python-docx for Word documents import openpyxl # for Excel files officeExtractorLoaded = True - logger.info("Office extraction libraries successfully loaded") + logger.debug("📄 Office extraction libraries successfully loaded") except ImportError as e: logger.warning(f"Office extraction libraries could not be loaded: {e}") @@ -113,7 +113,7 @@ class DocumentExtraction: global PIL, Image from PIL import Image imageProcessorLoaded = True - logger.info("Image processing libraries successfully loaded") + logger.debug("📄 Image processing libraries successfully loaded") except ImportError as e: logger.warning(f"Image processing libraries could not be loaded: {e}") @@ -157,7 +157,7 @@ class DocumentExtraction: processedItems = await self._aiDataExtraction(contentItems, prompt) contentItems = processedItems except Exception as e: - logger.error(f"Error processing content with AI: {str(e)}") + logger.error(f"❌ Error processing content with AI: {str(e)}") return ExtractedContent( id=documentId if documentId else str(uuid.uuid4()), @@ -165,7 +165,7 @@ class DocumentExtraction: ) except Exception as e: - logger.error(f"Error processing file data: {str(e)}") + logger.error(f"❌ Error processing file data: {str(e)}") raise FileProcessingError(f"Failed to process file data: {str(e)}") @@ -187,7 +187,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing text document: {str(e)}") + logger.error(f"❌ Error processing text document: {str(e)}") raise FileProcessingError(f"Failed to process text document: {str(e)}") async def _processCsv(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -206,7 +206,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing CSV document: {str(e)}") + logger.error(f"❌ Error processing CSV document: {str(e)}") raise FileProcessingError(f"Failed to process CSV document: {str(e)}") async def _processJson(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -226,7 +226,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing JSON document: {str(e)}") + logger.error(f"❌ Error processing JSON document: {str(e)}") raise FileProcessingError(f"Failed to process JSON document: {str(e)}") async def _processXml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -245,7 +245,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing XML document: {str(e)}") + logger.error(f"❌ Error processing XML document: {str(e)}") raise FileProcessingError(f"Failed to process XML document: {str(e)}") async def _processHtml(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -264,7 +264,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing HTML document: {str(e)}") + logger.error(f"❌ Error processing HTML document: {str(e)}") raise FileProcessingError(f"Failed to process HTML document: {str(e)}") async def _processSvg(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -284,7 +284,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing SVG document: {str(e)}") + logger.error(f"❌ Error processing SVG document: {str(e)}") raise FileProcessingError(f"Failed to process SVG document: {str(e)}") async def _processImage(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -315,7 +315,7 @@ class DocumentExtraction: metadata=metadata )] except Exception as e: - logger.error(f"Error processing image document: {str(e)}") + logger.error(f"❌ Error processing image document: {str(e)}") raise FileProcessingError(f"Failed to process image document: {str(e)}") async def _processPdf(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -378,13 +378,13 @@ class DocumentExtraction: ) )) except Exception as imgE: - logger.warning(f"Error extracting image {imgIndex} on page {pageNum + 1}: {str(imgE)}") + logger.warning(f"⚠️ Error extracting image {imgIndex} on page {pageNum + 1}: {str(imgE)}") doc.close() return contentItems except Exception as e: - logger.error(f"Error processing PDF document: {str(e)}") + logger.error(f"❌ Error processing PDF document: {str(e)}") raise FileProcessingError(f"Failed to process PDF document: {str(e)}") async def _processDocx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -423,7 +423,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing Word document: {str(e)}") + logger.error(f"❌ Error processing Word document: {str(e)}") raise FileProcessingError(f"Failed to process Word document: {str(e)}") async def _processXlsx(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -465,7 +465,7 @@ class DocumentExtraction: return contentItems except Exception as e: - logger.error(f"Error processing Excel document: {str(e)}") + logger.error(f"❌ Error processing Excel document: {str(e)}") raise FileProcessingError(f"Failed to process Excel document: {str(e)}") async def _processBinary(self, fileData: bytes, filename: str, mimeType: str) -> List[ContentItem]: @@ -482,7 +482,7 @@ class DocumentExtraction: ) )] except Exception as e: - logger.error(f"Error processing binary document: {str(e)}") + logger.error(f"❌ Error processing binary document: {str(e)}") raise FileProcessingError(f"Failed to process binary document: {str(e)}") async def _aiDataExtraction(self, contentItems: List[ContentItem], prompt: str) -> List[ContentItem]: @@ -502,7 +502,7 @@ class DocumentExtraction: try: # Get content type from metadata mimeType = item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else "text/plain" - logger.debug(f"Processing content item with MIME type: {mimeType}, label: {item.label}") + logger.debug(f"📄 Processing content item with MIME type: {mimeType}, label: {item.label}") # Chunk content based on type if mimeType.startswith('text/'): @@ -527,12 +527,12 @@ class DocumentExtraction: for chunk in chunks: # Process with AI based on content type try: - logger.debug(f"AI processing chunk with MIME type: {mimeType}") + logger.debug(f"🤖 AI processing chunk with MIME type: {mimeType}") if mimeType.startswith('image/'): # For images, use image AI service with base64 data # chunk is already base64 encoded string from _processImage # Use the original prompt directly for images (no content embedding) - logger.debug(f"Calling image AI service for MIME type: {mimeType}") + logger.debug(f"🤖 Calling image AI service for MIME type: {mimeType}") processedContent = await self._serviceCenter.callAiImageBasic(prompt, chunk, mimeType) else: # For text content, use text AI service @@ -553,14 +553,14 @@ class DocumentExtraction: Return ONLY the extracted information in a clear, concise format. """ - logger.debug(f"Calling text AI service for MIME type: {mimeType}") + logger.debug(f"🤖 Calling text AI service for MIME type: {mimeType}") processedContent = await self._serviceCenter.callAiTextBasic(aiPrompt, contentToProcess) chunkResults.append(processedContent) except Exception as aiError: - logger.error(f"AI processing failed for chunk: {str(aiError)}") - # Fallback to original content - chunkResults.append(chunk) + logger.error(f"❌ AI processing failed for chunk: {str(aiError)}") + # Fallback to original content + chunkResults.append(chunk) # Combine chunk results combinedResult = "\n".join(chunkResults) @@ -578,7 +578,7 @@ class DocumentExtraction: )) except Exception as e: - logger.error(f"Error processing content chunk: {str(e)}") + logger.error(f"❌ Error processing content chunk: {str(e)}") # Add original content if processing fails processedItems.append(item) diff --git a/modules/chat/documents/documentGeneration.py b/modules/chat/documents/documentGeneration.py index 744dc582..6527e2e5 100644 --- a/modules/chat/documents/documentGeneration.py +++ b/modules/chat/documents/documentGeneration.py @@ -22,11 +22,13 @@ class DocumentGenerator: """ try: documents = action_result.data.get("documents", []) + logger.debug(f"Processing {len(documents)} documents from action result") processed_documents = [] for doc in documents: processed_doc = self.processSingleDocument(doc, action) if processed_doc: processed_documents.append(processed_doc) + logger.debug(f"Successfully processed {len(processed_documents)} documents") return processed_documents except Exception as e: logger.error(f"Error processing action result documents: {str(e)}") @@ -119,45 +121,14 @@ class DocumentGenerator: ) if document: created_documents.append(document) - logger.info(f"Created document: {document_name} with file ID: {file_id} and MIME type: {mime_type}") + logger.debug(f"Created document: {document_name} ({len(content)} bytes, {mime_type})") else: logger.error(f"Failed to create ChatDocument object for {document_name}") except Exception as e: logger.error(f"Error creating document {doc_data.get('filename', 'unknown')}: {str(e)}") continue + logger.info(f"Created {len(created_documents)} documents from action result") return created_documents except Exception as e: logger.error(f"Error creating documents from action result: {str(e)}") return [] - - @staticmethod - def get_delivered_files_and_formats(documents): - delivered_files = [] - delivered_formats = [] - for doc in documents: - if hasattr(doc, 'filename'): - delivered_files.append(doc.filename) - file_extension = getFileExtension(doc.filename) - mime_type = getattr(doc, 'mimeType', 'application/octet-stream') - delivered_formats.append({ - 'filename': doc.filename, - 'extension': file_extension, - 'mimeType': mime_type - }) - elif isinstance(doc, dict) and 'filename' in doc: - delivered_files.append(doc['filename']) - file_extension = getFileExtension(doc['filename']) - mime_type = doc.get('mimeType', 'application/octet-stream') - delivered_formats.append({ - 'filename': doc['filename'], - 'extension': file_extension, - 'mimeType': mime_type - }) - else: - delivered_files.append(f"document_{len(delivered_files)}") - delivered_formats.append({ - 'filename': f"document_{len(delivered_files)}", - 'extension': 'unknown', - 'mimeType': 'application/octet-stream' - }) - return delivered_files, delivered_formats \ No newline at end of file diff --git a/modules/chat/handling/executionState.py b/modules/chat/handling/executionState.py index 5415cadc..5e8a4b8c 100644 --- a/modules/chat/handling/executionState.py +++ b/modules/chat/handling/executionState.py @@ -1,42 +1,56 @@ # executionState.py # Contains all execution state management logic extracted from managerChat.py +import logging from typing import List -from modules.interfaces.interfaceChatModel import TaskStep, ActionExecutionResult +from datetime import datetime, UTC +from modules.interfaces.interfaceChatModel import TaskStep, ActionResult + +logger = logging.getLogger(__name__) class TaskExecutionState: - """Manages state during task execution with retry logic""" + """Manages execution state for a task with retry logic""" + def __init__(self, task_step: TaskStep): self.task_step = task_step - self.successful_actions: List[ActionExecutionResult] = [] # Preserved across retries - self.failed_actions: List[ActionExecutionResult] = [] # For analysis + self.successful_actions: List[ActionResult] = [] # Preserved across retries + self.failed_actions: List[ActionResult] = [] # For analysis self.current_action_index = 0 self.retry_count = 0 - self.improvements = [] - self.partial_results = {} # Store intermediate results self.max_retries = 3 - - def addSuccessfulAction(self, action_result: ActionExecutionResult): + + def addSuccessfulAction(self, action_result: ActionResult): + """Add a successful action to the state""" self.successful_actions.append(action_result) - if action_result.data.get('resultLabel'): - self.partial_results[action_result.data['resultLabel']] = action_result - - def addFailedAction(self, action_result: ActionExecutionResult): + self.current_action_index += 1 + + def addFailedAction(self, action_result: ActionResult): + """Add a failed action to the state for analysis""" self.failed_actions.append(action_result) + self.current_action_index += 1 def getAvailableResults(self) -> list: - return [result.data.get('resultLabel', '') for result in self.successful_actions if result.data.get('resultLabel')] + """Get available results from successful actions""" + results = [] + for action in self.successful_actions: + if action.data and action.data.get('result'): + results.append(action.data['result']) + return results def shouldRetryTask(self) -> bool: - return len(self.successful_actions) > 0 and len(self.failed_actions) > 0 + """Determine if task should be retried based on failure patterns""" + return len(self.failed_actions) > 0 and self.canRetry() def canRetry(self) -> bool: + """Check if task can be retried""" return self.retry_count < self.max_retries def incrementRetryCount(self): + """Increment retry count""" self.retry_count += 1 def getFailurePatterns(self) -> list: + """Analyze failure patterns from failed actions""" patterns = [] for action in self.failed_actions: error = action.error.lower() if action.error else '' diff --git a/modules/chat/handling/handlingActions.py b/modules/chat/handling/handlingActions.py deleted file mode 100644 index f0bdcc1c..00000000 --- a/modules/chat/handling/handlingActions.py +++ /dev/null @@ -1,202 +0,0 @@ -# handlingActions.py -# Contains all action handling functions extracted from managerChat.py - -import logging -import json -import time -from typing import Dict, Any, Optional, List, Union -from datetime import datetime, UTC -from modules.interfaces.interfaceChatModel import ReviewResult, ActionResult -from .promptFactory import createResultReviewPrompt -from modules.chat.documents.documentGeneration import DocumentGenerator - -logger = logging.getLogger(__name__) - -class HandlingActions: - def __init__(self, service, chatInterface): - self.service = service - self.chatInterface = chatInterface - self.documentGenerator = DocumentGenerator(service) - - async def executeSingleAction(self, action, workflow): - """Execute a single action and return ActionResult with enhanced document processing""" - try: - enhanced_parameters = action.execParameters.copy() - if action.expectedDocumentFormats: - enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats - logger.info(f"Action {action.execMethod}.{action.execAction} expects formats: {action.expectedDocumentFormats}") - result = await self.service.executeAction( - methodName=action.execMethod, - actionName=action.execAction, - parameters=enhanced_parameters - ) - result_label = action.execResultLabel - if result.success: - action.setSuccess() - action.result = result.data.get("result", "") - action.execResultLabel = result_label - await self.createActionMessage(action, result, workflow, result_label) - else: - action.setError(result.error or "Action execution failed") - processed_documents = self.documentGenerator.processActionResultDocuments(result, action, workflow) - return ActionResult( - success=result.success, - data={ - "result": result.data.get("result", ""), - "documents": processed_documents, - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "resultLabel": result_label - }, - metadata={ - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "resultLabel": result_label - }, - validation=[], - error=result.error or "" - ) - except Exception as e: - logger.error(f"Error executing single action: {str(e)}") - action.setError(str(e)) - return ActionResult( - success=False, - data={ - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "documents": [] - }, - metadata={ - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction - }, - validation=[], - error=str(e) - ) - - async def validateActionResult(self, action_result, action, context) -> dict: - try: - prompt = self._createGenericValidationPrompt(action_result, action, context) - response = await self.service.callAiTextAdvanced(prompt, "action_validation") - validation = self._parseValidationResponse(response) - validation['action_id'] = action.id - validation['action_method'] = action.execMethod - validation['action_name'] = action.execAction - validation['result_label'] = action.execResultLabel - return validation - except Exception as e: - logger.error(f"Error validating action result: {str(e)}") - return { - 'status': 'success', - 'reason': f'Validation failed: {str(e)}', - 'confidence': 0.5, - 'improvements': [], - 'action_id': action.id, - 'action_method': action.execMethod, - 'action_name': action.execAction, - 'result_label': action.execResultLabel - } - - async def createActionMessage(self, action, result, workflow, result_label=None): - """Create and store a message for the action result in the workflow with enhanced document processing""" - try: - if result_label is None: - result_label = action.execResultLabel - message_data = { - "workflowId": workflow.id, - "role": "assistant", - "message": f"Executed action {action.execMethod}.{action.execAction}", - "status": "step", - "sequenceNr": len(workflow.messages) + 1, - "publishedAt": datetime.now(UTC).isoformat(), - "actionId": action.id, - "actionMethod": action.execMethod, - "actionName": action.execAction, - "documentsLabel": result_label, - "documents": [] - } - # Use the local createDocumentsFromActionResult method - created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow) - message_data["documents"] = created_documents - message = self.chatInterface.createWorkflowMessage(message_data) - if message: - workflow.messages.append(message) - logger.info(f"Created action message for {action.execMethod}.{action.execAction} with {len(created_documents)} documents") - logger.debug(f"WORKFLOW STATE after createActionMessage: id={id(workflow)}, message_count={len(workflow.messages)}") - for idx, msg in enumerate(workflow.messages): - label = getattr(msg, 'documentsLabel', None) - docs = getattr(msg, 'documents', None) - logger.debug(f" Message {idx}: label='{label}', documents_count={len(docs) if docs else 0}") - else: - logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}") - except Exception as e: - logger.error(f"Error creating action message: {str(e)}") - - # Internal helper methods - - def _createGenericValidationPrompt(self, action_result, action, context) -> str: - success = action_result.success - result_data = action_result.data - error = action_result.error - validation_messages = action_result.validation - result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data) - documents = result_data.get("documents", []) if isinstance(result_data, dict) else [] - doc_count = len(documents) - expected_result_label = action.execResultLabel - expected_format = action.execParameters.get('outputFormat', 'unknown') - expected_document_formats = action.expectedDocumentFormats or [] - actual_result_label = result_data.get("resultLabel", "") if isinstance(result_data, dict) else "" - result_label_match = actual_result_label == expected_result_label - # Use DocumentGenerator for file/format extraction - delivered_files, delivered_formats = DocumentGenerator.get_delivered_files_and_formats(documents) - content_items = [] - if isinstance(result_data, dict): - if 'extractedContent' in result_data: - extracted_content = result_data['extractedContent'] - if hasattr(extracted_content, 'contents'): - content_items = extracted_content.contents - elif 'contents' in result_data: - content_items = result_data['contents'] - if delivered_files and not content_items: - content_items = [f"File content available in: {', '.join(delivered_files)}"] - content_summary = [] - for item in content_items: - if hasattr(item, 'label') and hasattr(item, 'metadata'): - content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}") - elif isinstance(item, str): - content_summary.append(item) - else: - content_summary.append(str(item)) - return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format.\n\nACTION DETAILS:\n- Method: {action.execMethod}\n- Action: {action.execAction}\n- Expected Result Label: {expected_result_label}\n- Actual Result Label: {actual_result_label}\n- Result Label Match: {result_label_match}\n- Expected Format: {expected_format}\n- Expected Document Formats: {json.dumps(expected_document_formats, indent=2) if expected_document_formats else 'None specified'}\n- Parameters: {json.dumps(action.execParameters, indent=2)}\n\nRESULT TO VALIDATE:\n- Success: {success}\n- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''}\n- Error: {error}\n- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'}\n- Documents Produced: {doc_count}\n- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'}\n- Delivered Formats: {json.dumps(delivered_formats, indent=2) if delivered_formats else 'None'}\n- Content Items: {', '.join(content_summary) if content_summary else 'None'}\n\nCRITICAL VALIDATION CRITERIA:\n1. **Result Label Match**: Does the action result contain the expected result label?\n2. **File Delivery**: Did the action deliver the promised result file(s)?\n3. **Format Compliance**: If expected document formats were specified, do the delivered files match the expected formats?\n4. **Content Quality**: Is the content of the delivered files usable and complete?\n5. **Content Processing**: If content extraction was expected, was it performed correctly?\n\nCONTEXT:\n- Task Description: {context.task_step.description if context.task_step else 'Unknown'}\n- Previous Results: {', '.join(context.previous_results) if context.previous_results else 'None'}\n\nVALIDATION INSTRUCTIONS:\n1. **Result Label Check**: Verify that the expected result label \"{expected_result_label}\" is present in the action result data. This is the primary success criterion.\n2. **File Delivery**: Check if files were delivered when expected. The individual filenames don't need to match the result label - focus on whether content was actually produced.\n3. **Format Compliance**: If expected document formats were specified, check if delivered files match the expected extensions and MIME types. If no formats were specified, this criterion is satisfied.\n4. **Content Quality**: If files were delivered, consider the action successful. The presence of delivered files indicates content was processed and stored.\n5. **Content Processing**: If files were delivered, assume content extraction was performed correctly. The file delivery is evidence of successful processing.\n6. **Success Criteria**: The action is successful if the result label matches AND files were delivered. If expected formats were specified, they should also match.\n\nIMPORTANT NOTES:\n- The result label must be present in the action result data for success\n- Individual filenames can be different from the result label\n- If files were delivered, consider the action successful even if content details are not provided\n- Focus on whether the action accomplished its intended purpose (file delivery)\n- Empty files should be considered failures, but delivered files indicate success\n\nREQUIRED JSON RESPONSE:\n{{\n \"status\": \"success|retry|fail\",\n \"reason\": \"Detailed explanation focusing on result label match and content quality\",\n \"confidence\": 0.0-1.0,\n \"improvements\": [\"specific improvements if needed\"],\n \"quality_score\": 1-10,\n \"missing_elements\": [\"missing result label\", \"missing files\", \"content issues\"],\n \"suggested_retry_approach\": \"Specific approach for retry if status is retry\"\n}}\n\nNOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" - - def _parseValidationResponse(self, response: str) -> dict: - try: - json_start = response.find('{') - json_end = response.rfind('}') + 1 - if json_start == -1 or json_end == 0: - raise ValueError("No JSON found in validation response") - json_str = response[json_start:json_end] - validation = json.loads(json_str) - if 'status' not in validation: - raise ValueError("Validation response missing 'status' field") - validation.setdefault('confidence', 0.5) - validation.setdefault('improvements', []) - validation.setdefault('quality_score', 5) - validation.setdefault('missing_elements', []) - validation.setdefault('suggested_retry_approach', '') - return validation - except Exception as e: - logger.error(f"Error parsing validation response: {str(e)}") - return { - 'status': 'success', - 'reason': f'Parse error: {str(e)}', - 'confidence': 0.5, - 'improvements': [], - 'quality_score': 5, - 'missing_elements': [], - 'suggested_retry_approach': '' - } diff --git a/modules/chat/handling/handlingTasks.py b/modules/chat/handling/handlingTasks.py index 6f4d24d4..c78ce995 100644 --- a/modules/chat/handling/handlingTasks.py +++ b/modules/chat/handling/handlingTasks.py @@ -8,11 +8,11 @@ import time from typing import Dict, Any, Optional, List, Union from datetime import datetime, UTC from modules.interfaces.interfaceChatModel import ( - TaskStatus, TaskStep, TaskContext, TaskAction, ActionExecutionResult, ReviewResult, TaskPlan, WorkflowResult, TaskResult, ReviewContext + TaskStatus, TaskStep, TaskContext, TaskAction, ReviewResult, TaskPlan, WorkflowResult, TaskResult, ReviewContext, ActionResult ) from .executionState import TaskExecutionState -from .handlingActions import HandlingActions from .promptFactory import createTaskPlanningPrompt, createActionDefinitionPrompt, createResultReviewPrompt +from modules.chat.documents.documentGeneration import DocumentGenerator logger = logging.getLogger(__name__) @@ -21,28 +21,50 @@ class HandlingTasks: self.chatInterface = chatInterface self.service = service self.workflow = workflow - self.handlingActions = HandlingActions(service, chatInterface) + self.documentGenerator = DocumentGenerator(service) async def generateTaskPlan(self, userInput: str, workflow) -> TaskPlan: """Generate a high-level task plan for the workflow.""" try: logger.info(f"Generating task plan for workflow {workflow.id}") + available_docs = self.service.getAvailableDocuments(workflow) + logger.debug(f"Available documents: {available_docs}") + prompt = await self.service.callAiTextAdvanced( createTaskPlanningPrompt(self, { 'user_request': userInput, - 'available_documents': self.service.getAvailableDocuments(workflow), + 'available_documents': available_docs, 'workflow_id': workflow.id }) ) - task_plan_dict = self._parseTaskPlanResponse(prompt) + # Inline _parseTaskPlanResponse logic + try: + json_start = prompt.find('{') + json_end = prompt.rfind('}') + 1 + if json_start == -1 or json_end == 0: + raise ValueError("No JSON found in response") + json_str = prompt[json_start:json_end] + task_plan_dict = json.loads(json_str) + if 'tasks' not in task_plan_dict: + raise ValueError("Task plan missing 'tasks' field") + except Exception as e: + logger.error(f"Error parsing task plan response: {str(e)}") + task_plan_dict = {'tasks': []} + if not self._validateTaskPlan(task_plan_dict): logger.error("Generated task plan failed validation") raise Exception("AI-generated task plan failed validation - AI is required for task planning") + tasks = [TaskStep(**task_dict) for task_dict in task_plan_dict.get('tasks', [])] - return TaskPlan( + task_plan = TaskPlan( overview=task_plan_dict.get('overview', ''), tasks=tasks ) + + logger.info(f"Task plan generated successfully with {len(tasks)} tasks") + logger.debug(f"Task plan: {json.dumps(task_plan_dict, indent=2)}") + + return task_plan except Exception as e: logger.error(f"Error in generateTaskPlan: {str(e)}") raise @@ -51,11 +73,17 @@ class HandlingTasks: """Generate actions for a given task step.""" try: logger.info(f"Generating actions for task: {task_step.description}") + + available_docs = self.service.getAvailableDocuments(workflow) + available_connections = self.service.getConnectionReferenceList() + logger.debug(f"Available documents: {available_docs}") + logger.debug(f"Available connections: {available_connections}") + context = enhanced_context or TaskContext( task_step=task_step, workflow=workflow, workflow_id=workflow.id, - available_documents=self.service.getAvailableDocuments(workflow), + available_documents=available_docs, previous_results=previous_results or [], improvements=[], retry_count=0, @@ -67,7 +95,7 @@ class HandlingTasks: successful_actions=[] ) prompt = await self.service.callAiTextAdvanced( - createActionDefinitionPrompt(self, context) + await createActionDefinitionPrompt(self, context) ) # Inline parseActionResponse logic here json_start = prompt.find('{') @@ -86,6 +114,7 @@ class HandlingTasks: if not self._validateActions(actions, context): logger.error("Generated actions failed validation") raise Exception("AI-generated actions failed validation - AI is required for action generation") + # Convert to TaskAction objects task_actions = [self.chatInterface.createTaskAction({ "execMethod": a.get('method', 'unknown'), @@ -95,7 +124,12 @@ class HandlingTasks: "expectedDocumentFormats": a.get('expectedDocumentFormats', None), "status": TaskStatus.PENDING }) for a in actions] - return [ta for ta in task_actions if ta] + + valid_actions = [ta for ta in task_actions if ta] + logger.info(f"Generated {len(valid_actions)} actions for task: {task_step.description}") + logger.debug(f"Task actions plan: {json.dumps(action_data, indent=2)}") + + return valid_actions except Exception as e: logger.error(f"Error in generateTaskActions: {str(e)}") return [] @@ -114,7 +148,7 @@ class HandlingTasks: break action_results = [] for action in actions: - result = await self.handlingActions.executeSingleAction(action, workflow) + result = await self.executeSingleAction(action, workflow) action_results.append(result) if result.success: state.addSuccessfulAction(result) @@ -195,14 +229,36 @@ class HandlingTasks: review.setdefault('status', 'unknown') review.setdefault('reason', 'No reason provided') review.setdefault('quality_score', 5) + + # Ensure improvements is a list + improvements = review.get('improvements', []) + if isinstance(improvements, str): + # Split string into list if it's a single improvement + improvements = [improvements.strip()] if improvements.strip() else [] + elif not isinstance(improvements, list): + improvements = [] + + # Ensure all list fields are properly typed + missing_outputs = review.get('missing_outputs', []) + if not isinstance(missing_outputs, list): + missing_outputs = [] + + met_criteria = review.get('met_criteria', []) + if not isinstance(met_criteria, list): + met_criteria = [] + + unmet_criteria = review.get('unmet_criteria', []) + if not isinstance(unmet_criteria, list): + unmet_criteria = [] + return ReviewResult( status=review.get('status', 'unknown'), reason=review.get('reason', 'No reason provided'), - improvements=review.get('improvements', []), + improvements=improvements, quality_score=review.get('quality_score', 5), - missing_outputs=review.get('missing_outputs', []), - met_criteria=review.get('met_criteria', []), - unmet_criteria=review.get('unmet_criteria', []), + missing_outputs=missing_outputs, + met_criteria=met_criteria, + unmet_criteria=unmet_criteria, confidence=review.get('confidence', 0.5) ) except Exception as e: @@ -215,6 +271,23 @@ class HandlingTasks: async def prepareTaskHandover(self, task_step, task_actions, review_result, workflow): try: + # Log handover status summary + if hasattr(review_result, 'status'): + status = review_result.status + if hasattr(review_result, 'missing_outputs'): + missing = review_result.missing_outputs + else: + missing = [] + if hasattr(review_result, 'met_criteria'): + met = review_result.met_criteria + else: + met = [] + + logger.debug(f"Task handover status: {status}") + logger.debug(f"Promised documents: {task_step.expected_outputs}") + logger.debug(f"Delivered documents: {met}") + logger.debug(f"Missing documents: {missing}") + handover_data = { 'task_id': task_step.id, 'task_description': task_step.description, @@ -229,22 +302,127 @@ class HandlingTasks: logger.error(f"Error in prepareTaskHandover: {str(e)}") return {'error': str(e)} - # --- Helper and validation methods (unchanged, but can be inlined or made private) --- - - def _parseTaskPlanResponse(self, response: str) -> dict: + # --- Helper action handling methods --- + + async def executeSingleAction(self, action, workflow): + """Execute a single action and return ActionResult with enhanced document processing""" try: - json_start = response.find('{') - json_end = response.rfind('}') + 1 - if json_start == -1 or json_end == 0: - raise ValueError("No JSON found in response") - json_str = response[json_start:json_end] - task_plan = json.loads(json_str) - if 'tasks' not in task_plan: - raise ValueError("Task plan missing 'tasks' field") - return task_plan + logger.info(f"Executing action: {action.execMethod}.{action.execAction}") + + # Log input documents and connections + input_docs = action.execParameters.get('documentList', []) + logger.debug(f"Input documents: {input_docs}") + logger.debug(f"Input connections: {action.execParameters.get('connections', [])}") + + enhanced_parameters = action.execParameters.copy() + if action.expectedDocumentFormats: + enhanced_parameters['expectedDocumentFormats'] = action.expectedDocumentFormats + logger.debug(f"Expected document formats: {action.expectedDocumentFormats}") + + result = await self.service.executeAction( + methodName=action.execMethod, + actionName=action.execAction, + parameters=enhanced_parameters + ) + result_label = action.execResultLabel + + if result.success: + action.setSuccess() + action.result = result.data.get("result", "") + action.execResultLabel = result_label + await self.createActionMessage(action, result, workflow, result_label) + logger.info(f"Action {action.execMethod}.{action.execAction} executed successfully") + else: + action.setError(result.error or "Action execution failed") + logger.error(f"Action {action.execMethod}.{action.execAction} failed: {result.error}") + + return ActionResult( + success=result.success, + data={ + "result": result.data.get("result", ""), + "documents": [], # Documents will be processed in createActionMessage + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "resultLabel": result_label + }, + metadata={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "resultLabel": result_label + }, + validation={}, + error=result.error or "" + ) except Exception as e: - logger.error(f"Error parsing task plan response: {str(e)}") - return {'tasks': []} + logger.error(f"Error executing single action: {str(e)}") + action.setError(str(e)) + return ActionResult( + success=False, + data={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documents": [] + }, + metadata={ + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction + }, + validation={}, + error=str(e) + ) + + async def createActionMessage(self, action, result, workflow, result_label=None): + """Create and store a message for the action result in the workflow with enhanced document processing""" + try: + if result_label is None: + result_label = action.execResultLabel + + # Use the local createDocumentsFromActionResult method + created_documents = self.documentGenerator.createDocumentsFromActionResult(result, action, workflow) + + # Log delivered documents with sizes + if created_documents: + doc_info = [] + for doc in created_documents: + if hasattr(doc, 'filename') and hasattr(doc, 'fileSize'): + doc_info.append(f"{doc.filename} ({doc.fileSize} bytes)") + elif hasattr(doc, 'filename'): + doc_info.append(f"{doc.filename}") + else: + doc_info.append("unknown document") + logger.debug(f"Produced result label: {result_label}") + logger.debug(f"Delivered documents: {doc_info}") + else: + logger.debug(f"Produced result label: {result_label} (no documents)") + + message_data = { + "workflowId": workflow.id, + "role": "assistant", + "message": f"Executed action {action.execMethod}.{action.execAction}", + "status": "step", + "sequenceNr": len(workflow.messages) + 1, + "publishedAt": datetime.now(UTC).isoformat(), + "actionId": action.id, + "actionMethod": action.execMethod, + "actionName": action.execAction, + "documentsLabel": result_label, + "documents": created_documents + } + + message = self.chatInterface.createWorkflowMessage(message_data) + if message: + workflow.messages.append(message) + logger.info(f"Created action message for {action.execMethod}.{action.execAction} with {len(created_documents)} documents") + else: + logger.error(f"Failed to create workflow message for action {action.execMethod}.{action.execAction}") + except Exception as e: + logger.error(f"Error creating action message: {str(e)}") + + # --- Helper validation methods --- def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool: try: diff --git a/modules/chat/managerChat.py b/modules/chat/managerChat.py index 262563da..3531644c 100644 --- a/modules/chat/managerChat.py +++ b/modules/chat/managerChat.py @@ -1,7 +1,7 @@ import logging from typing import Dict, Any, List from modules.interfaces.interfaceAppModel import User -from modules.interfaces.interfaceChatModel import ChatWorkflow, UserInputRequest, TaskStep, TaskAction, ActionExecutionResult, ReviewResult, TaskPlan, WorkflowResult, TaskContext +from modules.interfaces.interfaceChatModel import ChatWorkflow, UserInputRequest, TaskStep, TaskAction, ActionResult, ReviewResult, TaskPlan, WorkflowResult, TaskContext from modules.chat.serviceCenter import ServiceCenter from modules.interfaces.interfaceChatObjects import ChatObjects from .handling.handlingTasks import HandlingTasks @@ -30,45 +30,58 @@ class ChatManager: """Unified Workflow Execution""" try: logger.info(f"Starting unified workflow execution for workflow {workflow.id}") + logger.debug(f"User request: {userInput.prompt}") + # Phase 1: High-Level Task Planning - task_plan = await self.handlingTasks.planHighLevelTasks(userInput.userRequest, workflow) + logger.info("Phase 1: Generating task plan") + task_plan = await self.handlingTasks.generateTaskPlan(userInput.prompt, workflow) if not task_plan or not task_plan.tasks: raise Exception("No tasks generated in task plan.") - workflow.taskPlan = task_plan - # Phase 2-5: For each task, define actions, execute, review, and handover + + # Phase 2-5: For each task, execute and get results + logger.info(f"Phase 2: Executing {len(task_plan.tasks)} tasks") all_task_results = [] + previous_results = [] for idx, task_step in enumerate(task_plan.tasks): - logger.info(f"Processing task {idx+1}/{len(task_plan.tasks)}: {task_step.description}") - # Define actions - previous_results = self.handlingTasks.getPreviousResults(task_step) if hasattr(self.handlingTasks, 'getPreviousResults') else [] - actions = await self.handlingTasks.generateTaskActions(task_step, workflow, previous_results=previous_results) - if not actions: - logger.warning(f"No actions defined for task {task_step.id}, skipping.") - continue - # Execute actions and get results (including review_result) - task_result = await self.handlingTasks.executeTaskActions(actions, workflow) - # task_result should include action_results and review_result - action_results = getattr(task_result, 'action_results', None) - review_result = getattr(task_result, 'review_result', None) + logger.info(f"Task {idx+1}/{len(task_plan.tasks)}: {task_step.description}") + # Create task context for this task + task_context = TaskContext( + task_step=task_step, + workflow=workflow, + workflow_id=workflow.id, + available_documents=self.service.getAvailableDocuments(workflow), + previous_results=previous_results + ) + # Execute task (this handles action generation, execution, and review internally) + task_result = await self.handlingTasks.executeTask(task_step, workflow, task_context) # Handover - handover_data = await self.handlingTasks.prepareTaskHandover(task_step, actions, review_result, workflow) + handover_data = await self.handlingTasks.prepareTaskHandover(task_step, [], task_result, workflow) # Collect results all_task_results.append({ 'task_step': task_step, - 'actions': actions, - 'action_results': action_results, - 'review_result': review_result, + 'task_result': task_result, 'handover_data': handover_data }) + # Update previous results for next task + if task_result.success and task_result.feedback: + previous_results.append(task_result.feedback) + # Final workflow result workflow_result = WorkflowResult( status="completed", - task_results=all_task_results, - workflow=workflow + completed_tasks=len(all_task_results), + total_tasks=len(task_plan.tasks), + execution_time=0.0, # TODO: Calculate actual execution time + final_results_count=len(all_task_results) ) - logger.info(f"Unified workflow execution completed for workflow {workflow.id}") + logger.info(f"Unified workflow execution completed successfully for workflow {workflow.id}") return workflow_result except Exception as e: logger.error(f"Error in executeUnifiedWorkflow: {str(e)}") - from modules.interfaces.interfaceChatModel import WorkflowResult - return WorkflowResult(status="failed", task_results=[], workflow=workflow) + return WorkflowResult( + status="failed", + completed_tasks=0, + total_tasks=0, + execution_time=0.0, + final_results_count=0 + ) diff --git a/modules/chat/methodBase.py b/modules/chat/methodBase.py index 8f09cb52..863a7ddb 100644 --- a/modules/chat/methodBase.py +++ b/modules/chat/methodBase.py @@ -264,10 +264,12 @@ class MethodBase: success=success, data=data, metadata=metadata or {}, - validation=[], + validation={}, error=error ) def _addValidationMessage(self, result: ActionResult, message: str) -> None: """Add a validation message to the result""" - result.validation.append(message) \ No newline at end of file + if 'messages' not in result.validation: + result.validation['messages'] = [] + result.validation['messages'].append(message) \ No newline at end of file diff --git a/modules/interfaces/interfaceChatModel.py b/modules/interfaces/interfaceChatModel.py index 439fa38a..a3a5ed43 100644 --- a/modules/interfaces/interfaceChatModel.py +++ b/modules/interfaces/interfaceChatModel.py @@ -13,12 +13,73 @@ from modules.shared.attributeUtils import register_model_labels, ModelMixin # ===== Method Models ===== class ActionResult(BaseModel, ModelMixin): - """Model for action results from a methods action""" + """Unified model for action results with workflow state management""" + # Core result fields success: bool = Field(description="Whether the method execution was successful") data: Dict[str, Any] = Field(description="Result data") metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") - validation: List[str] = Field(default_factory=list, description="Validation messages") error: Optional[str] = Field(None, description="Error message if any") + + # Action identification + actionId: Optional[str] = Field(None, description="ID of the action that produced this result") + actionMethod: Optional[str] = Field(None, description="Method of the action that produced this result") + actionName: Optional[str] = Field(None, description="Name of the action that produced this result") + + # Document handling + documents: List[str] = Field(default_factory=list, description="List of document references") + resultLabel: Optional[str] = Field(None, description="Label for the result") + + # Validation and workflow state + validation: Dict[str, Any] = Field(default_factory=dict, description="Validation information") + is_retry: bool = Field(default=False, description="Whether this is a retry attempt") + previous_error: Optional[str] = Field(None, description="Previous error message for retries") + applied_improvements: List[str] = Field(default_factory=list, description="Improvements applied for retry") + + @classmethod + def success(cls, documents: List[str] = None, resultLabel: str = None, data: Dict[str, Any] = None, + actionId: str = None, actionMethod: str = None, actionName: str = None) -> 'ActionResult': + """Create a successful action result""" + return cls( + success=True, + data=data or {}, + documents=documents or [], + resultLabel=resultLabel, + actionId=actionId, + actionMethod=actionMethod, + actionName=actionName + ) + + @classmethod + def failure(cls, error: str, data: Dict[str, Any] = None, + actionId: str = None, actionMethod: str = None, actionName: str = None) -> 'ActionResult': + """Create a failed action result""" + return cls( + success=False, + data=data or {}, + error=error, + actionId=actionId, + actionMethod=actionMethod, + actionName=actionName + ) + + @classmethod + def retry(cls, previous_result: 'ActionResult', improvements: List[str] = None) -> 'ActionResult': + """Create a retry action result based on a previous result""" + return cls( + success=previous_result.success, + data=previous_result.data, + metadata=previous_result.metadata, + validation=previous_result.validation, + error=previous_result.error, + documents=previous_result.documents, + resultLabel=previous_result.resultLabel, + actionId=previous_result.actionId, + actionMethod=previous_result.actionMethod, + actionName=previous_result.actionName, + is_retry=True, + previous_error=previous_result.error, + applied_improvements=improvements or [] + ) # Register labels for ActionResult register_model_labels( @@ -29,7 +90,15 @@ register_model_labels( "data": {"en": "Data", "fr": "Données"}, "metadata": {"en": "Metadata", "fr": "Métadonnées"}, "validation": {"en": "Validation", "fr": "Validation"}, - "error": {"en": "Error", "fr": "Erreur"} + "error": {"en": "Error", "fr": "Erreur"}, + "documents": {"en": "Documents", "fr": "Documents"}, + "resultLabel": {"en": "Result Label", "fr": "Étiquette du résultat"}, + "actionId": {"en": "Action ID", "fr": "ID de l'action"}, + "actionMethod": {"en": "Action Method", "fr": "Méthode de l'action"}, + "actionName": {"en": "Action Name", "fr": "Nom de l'action"}, + "is_retry": {"en": "Is Retry", "fr": "Est une nouvelle tentative"}, + "previous_error": {"en": "Previous Error", "fr": "Erreur précédente"}, + "applied_improvements": {"en": "Applied Improvements", "fr": "Améliorations appliquées"} } ) @@ -484,20 +553,6 @@ class TaskContext(BaseModel, ModelMixin): failed_actions: Optional[list] = [] successful_actions: Optional[list] = [] -class ActionExecutionResult(BaseModel, ModelMixin): - success: bool - data: dict - metadata: dict = {} - error: Optional[str] = None - actionId: Optional[str] = None - actionMethod: Optional[str] = None - actionName: Optional[str] = None - documents: Optional[list] = [] - validation: Optional[dict] = {} - is_retry: Optional[bool] = False - previous_error: Optional[str] = None - applied_improvements: Optional[list[str]] = [] - class ReviewContext(BaseModel, ModelMixin): task_step: TaskStep task_actions: Optional[list] = [] diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py index 7f9f78fc..436c8bbe 100644 --- a/modules/interfaces/interfaceChatObjects.py +++ b/modules/interfaces/interfaceChatObjects.py @@ -1225,7 +1225,7 @@ class ChatObjects: success=createdResult.get("success", False), data=createdResult.get("data", {}), metadata=createdResult.get("metadata", {}), - validation=createdResult.get("validation", []), + validation=createdResult.get("validation", {}), error=createdResult.get("error") ) diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py index 26b03752..9e0b6dba 100644 --- a/modules/methods/methodDocument.py +++ b/modules/methods/methodDocument.py @@ -217,7 +217,7 @@ class MethodDocument(MethodBase): ) # Generate HTML report - html_content = self._generateHtmlReport(chatDocuments, title, includeMetadata) + html_content = await self._generateHtmlReport(chatDocuments, title, includeMetadata) # Create output filename timestamp = datetime.now(UTC).strftime('%Y%m%d_%H%M%S') @@ -250,7 +250,7 @@ class MethodDocument(MethodBase): error=str(e) ) - def _generateHtmlReport(self, chatDocuments: List[Any], title: str, includeMetadata: bool) -> str: + async def _generateHtmlReport(self, chatDocuments: List[Any], title: str, includeMetadata: bool) -> str: """ Generate a comprehensive HTML report using AI from all input documents. """ @@ -304,7 +304,7 @@ class MethodDocument(MethodBase): # Call AI to generate the report logger.info(f"Generating AI report for {len(validDocuments)} documents") - aiReport = self.service.callAiTextBasic(aiPrompt, combinedContent) + aiReport = await self.service.callAiTextBasic(aiPrompt, combinedContent) # If AI call fails, fall back to basic HTML if not aiReport or aiReport.strip() == "": diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py index 945e3aa2..78123c46 100644 --- a/modules/workflow/managerWorkflow.py +++ b/modules/workflow/managerWorkflow.py @@ -37,7 +37,7 @@ class WorkflowManager: await self.chatManager.initialize(workflow) # Set user language - self.chatManager.setUserLanguage(userInput.userLanguage) + self.chatManager.service.setUserLanguage(userInput.userLanguage) # Send first message message = await self._sendFirstMessage(userInput, workflow) @@ -121,7 +121,7 @@ class WorkflowManager: # Add documents if any if userInput.listFileId: # Process file IDs and add to message data - documents = await self.chatManager.processFileIds(userInput.listFileId) + documents = await self.chatManager.service.processFileIds(userInput.listFileId) messageData["documents"] = documents # Create message using interface