From 86fe43e987b61ee5dfe2cbece7f762694759d001 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Thu, 10 Jul 2025 16:13:05 +0200
Subject: [PATCH] system test
---
README_document_test.md | 114 ++
modules/connectors/connectorAiOpenai.py | 53 +-
modules/connectors/connectorDbJson.py | 29 +-
modules/interfaces/interfaceAiCalls.py | 2 +-
modules/interfaces/interfaceAppObjects.py | 1 -
modules/interfaces/interfaceChatObjects.py | 2 +-
.../interfaces/interfaceComponentObjects.py | 35 +-
modules/methods/methodCoder.py | 18 +-
modules/methods/methodDocument.py | 245 +---
modules/routes/routeDataFiles.py | 16 +-
modules/routes/routeDataPrompts.py | 6 +-
modules/routes/routeWorkflows.py | 125 +-
modules/workflow/managerChat.py | 1012 ++++++++++++++---
modules/workflow/managerDocument.py | 4 +-
modules/workflow/managerWorkflow.py | 91 ++
modules/workflow/processorDocument.py | 251 ++--
modules/workflow/serviceContainer.py | 178 ++-
run_document_test.ps1 | 31 +
test_document_extraction.py | 288 +++++
test_retry_enhancement.py | 289 +++++
test_sample_document.txt | 47 +
21 files changed, 2247 insertions(+), 590 deletions(-)
create mode 100644 README_document_test.md
create mode 100644 run_document_test.ps1
create mode 100644 test_document_extraction.py
create mode 100644 test_retry_enhancement.py
create mode 100644 test_sample_document.txt
diff --git a/README_document_test.md b/README_document_test.md
new file mode 100644
index 00000000..7bf052f2
--- /dev/null
+++ b/README_document_test.md
@@ -0,0 +1,114 @@
+# Document Extraction Test
+
+This test procedure validates the DocumentManager's ability to extract content from files using AI-powered analysis.
+
+## Files Created
+
+- `test_document_extraction.py` - Main test script
+- `test_sample_document.txt` - Sample document for testing
+- `run_document_test.ps1` - PowerShell wrapper script
+- `test_document_extraction.log` - Generated log file (cleared on each run)
+
+## Usage
+
+### Method 1: Using PowerShell Script (Recommended)
+
+```powershell
+# Test with default sample file
+.\run_document_test.ps1
+
+# Test with custom file
+.\run_document_test.ps1 "path\to\your\document.pdf"
+```
+
+### Method 2: Direct Python Execution
+
+```bash
+# Test with default sample file
+python test_document_extraction.py test_sample_document.txt
+
+# Test with custom file
+python test_document_extraction.py "path/to/your/document.docx"
+```
+
+## Test Features
+
+1. **File Validation**: Checks if the specified file exists
+2. **MIME Type Detection**: Automatically detects file type based on extension
+3. **Content Extraction**: Uses the DocumentManager to extract content
+4. **AI Processing**: Applies the prompt "summarize the content and give list of the major topics"
+5. **Comprehensive Logging**: Logs all steps and results to `test_document_extraction.log`
+6. **Log Cleanup**: Clears the log file on each test run
+
+## Supported File Types
+
+- Text files (.txt, .md)
+- CSV files (.csv)
+- JSON files (.json)
+- XML files (.xml)
+- HTML files (.html, .htm)
+- Images (.jpg, .jpeg, .png, .gif, .svg)
+- PDF files (.pdf)
+- Office documents (.docx, .xlsx, .pptx)
+- And more (fallback to binary processing)
+
+## Test Output
+
+The test generates detailed logs including:
+
+- File information (path, size, MIME type)
+- Extraction process details
+- Extracted content summary
+- AI-processed results
+- Error details if any issues occur
+
+## Example Output
+
+```
+=== STARTING DOCUMENT EXTRACTION TEST ===
+File information: {
+ "file_path": "test_sample_document.txt",
+ "filename": "test_sample_document.txt",
+ "mime_type": "text/plain",
+ "file_size_bytes": 2048,
+ "file_size_mb": 0.0
+}
+Document extraction completed successfully: {
+ "extracted_content_id": "test-doc-1234567890",
+ "content_items_count": 1,
+ "object_type": "ExtractedContent"
+}
+COMPLETE EXTRACTED CONTENT: {
+ "total_length": 1500,
+ "content": "PowerOn System Architecture Overview... [AI processed summary]"
+}
+```
+
+## Error Handling
+
+The test includes comprehensive error handling for:
+
+- File not found errors
+- File reading errors
+- Document processing errors
+- AI processing errors
+- Import errors
+
+All errors are logged with detailed information for debugging.
+
+## Configuration
+
+The test uses the same configuration as other tests:
+
+- Environment variable: `POWERON_CONFIG_FILE = 'test_config.ini'`
+- Log file: `test_document_extraction.log`
+- Log level: DEBUG
+
+## Dependencies
+
+The test requires the same dependencies as the main PowerOn system:
+
+- Python 3.8+
+- Required Python packages (see requirements.txt)
+- Access to AI services (if AI processing is enabled)
+- Proper configuration in test_config.ini
\ No newline at end of file
diff --git a/modules/connectors/connectorAiOpenai.py b/modules/connectors/connectorAiOpenai.py
index 847380cd..b81991d3 100644
--- a/modules/connectors/connectorAiOpenai.py
+++ b/modules/connectors/connectorAiOpenai.py
@@ -98,7 +98,27 @@ class AiOpenai:
The response from the OpenAI Vision API as text
"""
try:
- logger.debug(f"Starting image analysis for {mimeType} with query '{prompt}' for {mimeType} size {len(imageData)}B...")
+ logger.debug(f"Starting image analysis with query '{prompt}' for size {len(imageData)}B...")
+
+ # Ensure imageData is a string (base64 encoded)
+ if not isinstance(imageData, str):
+ raise ValueError("imageData must be a string (base64 encoded)")
+
+ # Fix base64 padding if needed
+ padding_needed = len(imageData) % 4
+ if padding_needed:
+ imageData += '=' * (4 - padding_needed)
+
+ # Use default MIME type if not provided
+ if not mimeType:
+ mimeType = "image/jpeg"
+
+ logger.debug(f"Using MIME type: {mimeType}")
+ logger.debug(f"Base64 data length: {len(imageData)} characters")
+
+ # Create the data URL format as required by OpenAI Vision API
+ data_url = f"data:{mimeType};base64,{imageData}"
+
messages = [
{
"role": "user",
@@ -107,15 +127,40 @@ class AiOpenai:
{
"type": "image_url",
"image_url": {
- "url": f"data:{mimeType};base64,{imageData}"
+ "url": data_url
}
}
]
}
]
- # Use the existing callApi function with the Vision model
- response = await self.callApi(messages)
+ # Use a vision-capable model for image analysis
+ # Override the model for vision tasks
+ visionModel = "gpt-4o" # or "gpt-4-vision-preview" depending on availability
+
+ # Use parameters from configuration
+ temperature = self.config.get("temperature", 0.2)
+ maxTokens = self.config.get("maxTokens", 2000)
+
+ payload = {
+ "model": visionModel,
+ "messages": messages,
+ "temperature": temperature,
+ "max_tokens": maxTokens
+ }
+
+ response = await self.httpClient.post(
+ self.apiUrl,
+ json=payload
+ )
+
+ if response.status_code != 200:
+ logger.error(f"OpenAI API error: {response.status_code} - {response.text}")
+ raise HTTPException(status_code=500, detail="Error communicating with OpenAI API")
+
+ responseJson = response.json()
+ content = responseJson["choices"][0]["message"]["content"]
+ return content
# Return content
return response
diff --git a/modules/connectors/connectorDbJson.py b/modules/connectors/connectorDbJson.py
index 0d6658ae..d6ebc877 100644
--- a/modules/connectors/connectorDbJson.py
+++ b/modules/connectors/connectorDbJson.py
@@ -173,13 +173,31 @@ class DatabaseConnector:
record["_modifiedAt"] = currentTime.isoformat()
record["_modifiedBy"] = self.userId
- # Save the record file
+ # Save the record file using atomic write
recordPath = self._getRecordPath(table, recordId)
+ tempPath = recordPath + '.tmp'
+
+ # Ensure directory exists
os.makedirs(os.path.dirname(recordPath), exist_ok=True)
- with open(recordPath, 'w', encoding='utf-8') as f:
+ # Write to temporary file first
+ with open(tempPath, 'w', encoding='utf-8') as f:
json.dump(record, f, indent=2, ensure_ascii=False)
+ # Verify the temporary file can be read back (validation)
+ try:
+ with open(tempPath, 'r', encoding='utf-8') as f:
+ json.load(f) # This will fail if file is corrupted
+ except Exception as e:
+ logger.error(f"Validation failed for record {recordId}: {e}")
+ # Clean up temp file
+ if os.path.exists(tempPath):
+ os.remove(tempPath)
+ raise ValueError(f"Record validation failed: {e}")
+
+ # Atomic move from temp to final location
+ os.replace(tempPath, recordPath)
+
# Update metadata
metadata = self._loadTableMetadata(table)
if recordId not in metadata["recordIds"]:
@@ -203,6 +221,13 @@ class DatabaseConnector:
except Exception as e:
logger.error(f"Error saving record {recordId} to table {table}: {e}")
+ # Clean up temp file if it exists
+ tempPath = self._getRecordPath(table, recordId) + '.tmp'
+ if os.path.exists(tempPath):
+ try:
+ os.remove(tempPath)
+ except:
+ pass
return False
def _loadTable(self, table: str) -> List[Dict[str, Any]]:
diff --git a/modules/interfaces/interfaceAiCalls.py b/modules/interfaces/interfaceAiCalls.py
index 79a662af..424d1d36 100644
--- a/modules/interfaces/interfaceAiCalls.py
+++ b/modules/interfaces/interfaceAiCalls.py
@@ -116,7 +116,7 @@ class AiCalls:
The AI response as text
"""
try:
- return await self.openaiService.callAiImage(imageData, mimeType, prompt)
+ return await self.openaiService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in OpenAI image call: {str(e)}")
return f"Error: {str(e)}"
diff --git a/modules/interfaces/interfaceAppObjects.py b/modules/interfaces/interfaceAppObjects.py
index 92c55834..a89e2f5a 100644
--- a/modules/interfaces/interfaceAppObjects.py
+++ b/modules/interfaces/interfaceAppObjects.py
@@ -237,7 +237,6 @@ class AppObjects:
# Find user by username
for user_dict in users:
if user_dict.get("username") == username:
- logger.info(f"Found user with username {username}")
return User.from_dict(user_dict)
logger.info(f"No user found with username {username}")
diff --git a/modules/interfaces/interfaceChatObjects.py b/modules/interfaces/interfaceChatObjects.py
index 002e8370..5bb48da3 100644
--- a/modules/interfaces/interfaceChatObjects.py
+++ b/modules/interfaces/interfaceChatObjects.py
@@ -760,7 +760,7 @@ class ChatObjects:
else:
# Create new workflow
workflowData = {
- "name": userInput.name or "New Workflow",
+ "name": "New Workflow", # Default name since UserInputRequest doesn't have a name field
"status": "running",
"startedAt": currentTime,
"lastActivity": currentTime,
diff --git a/modules/interfaces/interfaceComponentObjects.py b/modules/interfaces/interfaceComponentObjects.py
index 7e6f0c4d..0041da96 100644
--- a/modules/interfaces/interfaceComponentObjects.py
+++ b/modules/interfaces/interfaceComponentObjects.py
@@ -690,34 +690,39 @@ class ComponentObjects:
return None
# Process content based on file type
- contentType = "binary"
+ isText = False
content = ""
+ encoding = None
- if file.get("mimeType", "").startswith("text/"):
+ # Use proper attribute access for FileItem object
+ if file.mimeType.startswith("text/"):
# For text files, return full content
try:
content = fileContent.decode('utf-8')
- contentType = "text"
+ isText = True
+ encoding = 'utf-8'
except UnicodeDecodeError:
content = fileContent.decode('latin-1')
- contentType = "text"
- elif file.get("mimeType", "").startswith("image/"):
+ isText = True
+ encoding = 'latin-1'
+ elif file.mimeType.startswith("image/"):
# For images, return base64
- contentType = "base64"
- content = f"data:{file['mimeType']};base64,{fileContent.hex()}"
+ import base64
+ content = base64.b64encode(fileContent).decode('utf-8')
+ isText = False
else:
# For other files, return as base64
- contentType = "base64"
- content = f"data:{file['mimeType']};base64,{fileContent.hex()}"
+ import base64
+ content = base64.b64encode(fileContent).decode('utf-8')
+ isText = False
return FilePreview(
- id=fileId,
- name=file.get("name", "Unknown"),
- mimeType=file.get("mimeType", "application/octet-stream"),
- size=file.get("size", 0),
content=content,
- contentType=contentType,
- metadata=file.get("metadata", {})
+ mimeType=file.mimeType,
+ filename=file.filename,
+ isText=isText,
+ encoding=encoding,
+ size=file.fileSize
)
except Exception as e:
diff --git a/modules/methods/methodCoder.py b/modules/methods/methodCoder.py
index e0e09bb4..d9cc5289 100644
--- a/modules/methods/methodCoder.py
+++ b/modules/methods/methodCoder.py
@@ -1,4 +1,4 @@
-from typing import Dict, Any, Optional
+from typing import Dict, Any, Optional, List
import logging
import uuid
from datetime import datetime, UTC
@@ -11,10 +11,11 @@ class MethodCoder(MethodBase):
"""Coder method implementation for code operations"""
def __init__(self, serviceContainer: Any):
+ """Initialize the coder method"""
super().__init__(serviceContainer)
self.name = "coder"
- self.description = "Handle code operations like analysis and generation"
-
+ self.description = "Handle code operations like analysis, generation, and refactoring"
+
@action
async def analyze(self, parameters: Dict[str, Any]) -> ActionResult:
"""
@@ -55,7 +56,7 @@ class MethodCoder(MethodBase):
error="No documents found for the provided reference"
)
- # Extract content from all documents
+ # Process each document individually
all_code_content = []
for chatDocument in chatDocuments:
@@ -85,15 +86,18 @@ class MethodCoder(MethodBase):
error="No code content could be extracted from any documents"
)
- # Combine all code content for analysis
- combined_code = "\n\n--- CODE SEPARATOR ---\n\n".join(all_code_content)
+ # Extract text content from ExtractedContent objects
+ text_contents = self.service.extractTextFromContentObjects(all_code_content)
+
+ # Combine all extracted text content for analysis
+ combined_content = "\n\n--- CODE SEPARATOR ---\n\n".join(text_contents)
# Create analysis prompt
analysis_prompt = f"""
Analyze this {language} code for quality, structure, and potential issues.
Code to analyze:
- {combined_code}
+ {combined_content}
Please check for:
{', '.join(checks)}
diff --git a/modules/methods/methodDocument.py b/modules/methods/methodDocument.py
index dee445b0..a1b437de 100644
--- a/modules/methods/methodDocument.py
+++ b/modules/methods/methodDocument.py
@@ -26,18 +26,16 @@ class MethodDocument(MethodBase):
@action
async def extract(self, parameters: Dict[str, Any]) -> ActionResult:
"""
- Extract content from document
+ Extract specific content from document with ai prompt and return it as a json file
Parameters:
documentList (str): Reference to the document list to extract content from
aiPrompt (str): AI prompt for content extraction
- format (str, optional): Output format (default: "text")
includeMetadata (bool, optional): Whether to include metadata (default: True)
"""
try:
documentList = parameters.get("documentList")
aiPrompt = parameters.get("aiPrompt")
- format = parameters.get("format", "text")
includeMetadata = parameters.get("includeMetadata", True)
if not documentList:
@@ -95,12 +93,14 @@ class MethodDocument(MethodBase):
error="No content could be extracted from any documents"
)
- # Combine all extracted content
- combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join(all_extracted_content)
+ # Extract text content from ExtractedContent objects
+ text_contents = self.service.extractTextFromContentObjects(all_extracted_content)
+
+ # Combine all extracted text content
+ combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join(text_contents)
result_data = {
"documentCount": len(chatDocuments),
- "format": format,
"content": combined_content,
"fileInfos": file_infos if includeMetadata else None,
"timestamp": datetime.now(UTC).isoformat()
@@ -124,236 +124,3 @@ class MethodDocument(MethodBase):
data={},
error=str(e)
)
-
- @action
- async def analyze(self, parameters: Dict[str, Any]) -> ActionResult:
- """
- Analyze document content
-
- Parameters:
- documentList (str): Reference to the document list to analyze
- aiPrompt (str): AI prompt for content analysis
- analysis (List[str], optional): Types of analysis to perform (default: ["entities", "topics", "sentiment"])
- """
- try:
- documentList = parameters.get("documentList")
- aiPrompt = parameters.get("aiPrompt")
- analysis = parameters.get("analysis", ["entities", "topics", "sentiment"])
-
- if not documentList:
- return self._createResult(
- success=False,
- data={},
- error="Document list reference is required"
- )
-
- if not aiPrompt:
- return self._createResult(
- success=False,
- data={},
- error="AI prompt is required"
- )
-
- chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
- if not chatDocuments:
- return self._createResult(
- success=False,
- data={},
- error="No documents found for the provided reference"
- )
-
- # Extract content from all documents
- all_extracted_content = []
-
- for chatDocument in chatDocuments:
- fileId = chatDocument.fileId
- file_data = self.service.getFileData(fileId)
- file_info = self.service.getFileInfo(fileId)
-
- if not file_data:
- logger.warning(f"File not found or empty for fileId: {fileId}")
- continue
-
- extracted_content = await self.service.extractContentFromFileData(
- prompt=aiPrompt,
- fileData=file_data,
- filename=file_info.get('name', 'document'),
- mimeType=file_info.get('mimeType', 'application/octet-stream'),
- base64Encoded=False,
- documentId=chatDocument.id
- )
-
- all_extracted_content.append(extracted_content)
-
- if not all_extracted_content:
- return self._createResult(
- success=False,
- data={},
- error="No content could be extracted from any documents"
- )
-
- # Combine all extracted content for analysis
- combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join(all_extracted_content)
-
- analysis_prompt = f"""
- Analyze this document content for the following aspects:
- {', '.join(analysis)}
-
- Document content:
- {combined_content[:8000]} # Limit content length
-
- Please provide a detailed analysis including:
- 1. Key entities (people, organizations, locations, dates)
- 2. Main topics and themes
- 3. Sentiment analysis (positive, negative, neutral)
- 4. Key insights and patterns
- 5. Important relationships between entities
- 6. Document structure and organization
- """
-
- analysis_result = await self.service.interfaceAiCalls.callAiTextAdvanced(analysis_prompt)
-
- result_data = {
- "documentCount": len(chatDocuments),
- "analysis": analysis,
- "results": analysis_result,
- "content": combined_content,
- "timestamp": datetime.now(UTC).isoformat()
- }
-
- return self._createResult(
- success=True,
- data={
- "documents": [
- {
- "documentName": f"document_analysis_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
- "documentData": result_data
- }
- ]
- }
- )
- except Exception as e:
- logger.error(f"Error analyzing content: {str(e)}")
- return self._createResult(
- success=False,
- data={},
- error=str(e)
- )
-
- @action
- async def summarize(self, parameters: Dict[str, Any]) -> ActionResult:
- """
- Summarize document content
-
- Parameters:
- documentList (str): Reference to the document list to summarize
- aiPrompt (str): AI prompt for content extraction
- maxLength (int, optional): Maximum length of summary in words (default: 200)
- format (str, optional): Output format (default: "text")
- """
- try:
- documentList = parameters.get("documentList")
- aiPrompt = parameters.get("aiPrompt")
- maxLength = parameters.get("maxLength", 200)
- format = parameters.get("format", "text")
-
- if not documentList:
- return self._createResult(
- success=False,
- data={},
- error="Document list reference is required"
- )
-
- if not aiPrompt:
- return self._createResult(
- success=False,
- data={},
- error="AI prompt is required"
- )
-
- chatDocuments = self.service.getChatDocumentsFromDocumentList(documentList)
- if not chatDocuments:
- return self._createResult(
- success=False,
- data={},
- error="No documents found for the provided reference"
- )
-
- # Extract content from all documents
- all_extracted_content = []
-
- for chatDocument in chatDocuments:
- fileId = chatDocument.fileId
- file_data = self.service.getFileData(fileId)
- file_info = self.service.getFileInfo(fileId)
-
- if not file_data:
- logger.warning(f"File not found or empty for fileId: {fileId}")
- continue
-
- extracted_content = await self.service.extractContentFromFileData(
- prompt=aiPrompt,
- fileData=file_data,
- filename=file_info.get('name', 'document'),
- mimeType=file_info.get('mimeType', 'application/octet-stream'),
- base64Encoded=False,
- documentId=chatDocument.id
- )
-
- all_extracted_content.append(extracted_content)
-
- if not all_extracted_content:
- return self._createResult(
- success=False,
- data={},
- error="No content could be extracted from any documents"
- )
-
- # Combine all extracted content for summarization
- combined_content = "\n\n--- DOCUMENT SEPARATOR ---\n\n".join(all_extracted_content)
-
- summary_prompt = f"""
- Create a comprehensive summary of this document content.
-
- Document content:
- {combined_content[:8000]} # Limit content length
-
- Requirements:
- - Maximum length: {maxLength} words
- - Format: {format}
- - Include key points and main ideas
- - Maintain accuracy and completeness
- - Use clear, professional language
- - Highlight important insights and conclusions
- """
-
- summary = await self.service.interfaceAiCalls.callAiTextAdvanced(summary_prompt)
-
- result_data = {
- "documentCount": len(chatDocuments),
- "maxLength": maxLength,
- "format": format,
- "summary": summary,
- "wordCount": len(summary.split()),
- "originalContent": combined_content,
- "timestamp": datetime.now(UTC).isoformat()
- }
-
- return self._createResult(
- success=True,
- data={
- "documents": [
- {
- "documentName": f"document_summary_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.txt",
- "documentData": result_data
- }
- ]
- }
- )
- except Exception as e:
- logger.error(f"Error summarizing content: {str(e)}")
- return self._createResult(
- success=False,
- data={},
- error=str(e)
- )
diff --git a/modules/routes/routeDataFiles.py b/modules/routes/routeDataFiles.py
index 26f63bd0..3119734f 100644
--- a/modules/routes/routeDataFiles.py
+++ b/modules/routes/routeDataFiles.py
@@ -133,7 +133,7 @@ async def get_file(
detail=f"File with ID {fileId} not found"
)
- return FileItem(**fileData)
+ return fileData
except interfaceComponentObjects.FileNotFoundError as e:
logger.warning(f"File not found: {str(e)}")
@@ -180,8 +180,8 @@ async def update_file(
detail=f"File with ID {fileId} not found"
)
- # Check if user has access to the file
- if file.get("userId", 0) != currentUser.get("id", 0):
+ # Check if user has access to the file using the interface's permission system
+ if not managementInterface._canModify("files", fileId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this file"
@@ -195,9 +195,9 @@ async def update_file(
detail="Failed to update file"
)
- # Get updated file and convert to FileItem
+ # Get updated file
updatedFile = managementInterface.getFile(fileId)
- return FileItem(**updatedFile)
+ return updatedFile
except HTTPException as he:
raise he
@@ -328,15 +328,15 @@ async def preview_file(
try:
managementInterface = interfaceComponentObjects.getInterface(currentUser)
- # Get file preview
- preview = managementInterface.getFilePreview(fileId)
+ # Get file preview using the correct method
+ preview = managementInterface.getFileContent(fileId)
if not preview:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"File with ID {fileId} not found or no content available"
)
- return FilePreview(**preview)
+ return preview
except HTTPException:
raise
except Exception as e:
diff --git a/modules/routes/routeDataPrompts.py b/modules/routes/routeDataPrompts.py
index b4809151..c2e58771 100644
--- a/modules/routes/routeDataPrompts.py
+++ b/modules/routes/routeDataPrompts.py
@@ -54,7 +54,7 @@ async def create_prompt(
# Create prompt
newPrompt = managementInterface.createPrompt(prompt_data)
- return Prompt.from_dict(newPrompt)
+ return Prompt(**newPrompt)
@router.get("/{promptId}", response_model=Prompt)
@limiter.limit("30/minute")
@@ -74,7 +74,7 @@ async def get_prompt(
detail=f"Prompt with ID {promptId} not found"
)
- return Prompt.from_dict(prompt)
+ return prompt
@router.put("/{promptId}", response_model=Prompt)
@limiter.limit("10/minute")
@@ -107,7 +107,7 @@ async def update_prompt(
detail="Error updating the prompt"
)
- return Prompt.from_dict(updatedPrompt)
+ return Prompt(**updatedPrompt)
@router.delete("/{promptId}", response_model=Dict[str, Any])
@limiter.limit("10/minute")
diff --git a/modules/routes/routeWorkflows.py b/modules/routes/routeWorkflows.py
index a5258f9d..de39a1c4 100644
--- a/modules/routes/routeWorkflows.py
+++ b/modules/routes/routeWorkflows.py
@@ -48,7 +48,7 @@ def getServiceChat(currentUser: User):
# Consolidated endpoint for getting all workflows
@router.get("/", response_model=List[ChatWorkflow])
-@limiter.limit("30/minute")
+@limiter.limit("120/minute")
async def get_workflows(
request: Request,
currentUser: User = Depends(getCurrentUser)
@@ -56,7 +56,31 @@ async def get_workflows(
"""Get all workflows for the current user."""
try:
appInterface = getInterface(currentUser)
- return appInterface.getAllWorkflows()
+ workflows_data = appInterface.getAllWorkflows()
+
+ # Convert raw dictionaries to ChatWorkflow objects
+ workflows = []
+ for workflow_data in workflows_data:
+ try:
+ workflow = ChatWorkflow(
+ id=workflow_data["id"],
+ status=workflow_data.get("status", "running"),
+ name=workflow_data.get("name"),
+ currentRound=workflow_data.get("currentRound", 1),
+ lastActivity=workflow_data.get("lastActivity", appInterface._getCurrentTimestamp()),
+ startedAt=workflow_data.get("startedAt", appInterface._getCurrentTimestamp()),
+ logs=[ChatLog(**log) for log in workflow_data.get("logs", [])],
+ messages=[ChatMessage(**msg) for msg in workflow_data.get("messages", [])],
+ stats=ChatStat(**workflow_data.get("dataStats", {})) if workflow_data.get("dataStats") else None,
+ mandateId=workflow_data.get("mandateId", currentUser.mandateId or "")
+ )
+ workflows.append(workflow)
+ except Exception as e:
+ logger.warning(f"Error converting workflow data to ChatWorkflow object: {str(e)}")
+ # Skip invalid workflows instead of failing the entire request
+ continue
+
+ return workflows
except Exception as e:
logger.error(f"Error getting workflows: {str(e)}")
raise HTTPException(
@@ -65,7 +89,7 @@ async def get_workflows(
)
@router.get("/{workflowId}", response_model=ChatWorkflow)
-@limiter.limit("30/minute")
+@limiter.limit("120/minute")
async def get_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
@@ -93,9 +117,58 @@ async def get_workflow(
detail=f"Failed to get workflow: {str(e)}"
)
+@router.put("/{workflowId}", response_model=ChatWorkflow)
+@limiter.limit("120/minute")
+async def update_workflow(
+ request: Request,
+ workflowId: str = Path(..., description="ID of the workflow to update"),
+ workflowData: Dict[str, Any] = Body(...),
+ currentUser: User = Depends(getCurrentUser)
+) -> ChatWorkflow:
+ """Update workflow by ID"""
+ try:
+ # Get workflow interface with current user context
+ workflowInterface = getInterface(currentUser)
+
+ # Get raw workflow data from database to check permissions
+ workflows = workflowInterface.db.getRecordset("workflows", recordFilter={"id": workflowId})
+ if not workflows:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail="Workflow not found"
+ )
+
+ workflow_data = workflows[0]
+
+ # Check if user has permission to update using the interface's permission system
+ if not workflowInterface._canModify("workflows", workflowId):
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail="You don't have permission to update this workflow"
+ )
+
+ # Update workflow
+ updatedWorkflow = workflowInterface.updateWorkflow(workflowId, workflowData)
+ if not updatedWorkflow:
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail="Failed to update workflow"
+ )
+
+ return updatedWorkflow
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error updating workflow: {str(e)}")
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail=f"Failed to update workflow: {str(e)}"
+ )
+
# API Endpoint for workflow status
@router.get("/{workflowId}/status", response_model=ChatWorkflow)
-@limiter.limit("30/minute")
+@limiter.limit("120/minute")
async def get_workflow_status(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
@@ -114,7 +187,7 @@ async def get_workflow_status(
detail=f"Workflow with ID {workflowId} not found"
)
- return ChatWorkflow(**workflow)
+ return workflow
except HTTPException:
raise
except Exception as e:
@@ -126,7 +199,7 @@ async def get_workflow_status(
# API Endpoint for workflow logs with selective data transfer
@router.get("/{workflowId}/logs", response_model=List[ChatLog])
-@limiter.limit("30/minute")
+@limiter.limit("120/minute")
async def get_workflow_logs(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
@@ -152,12 +225,12 @@ async def get_workflow_logs(
# Apply selective data transfer if logId is provided
if logId:
# Find the index of the log with the given ID
- logIndex = next((i for i, log in enumerate(allLogs) if log.get("id") == logId), -1)
+ logIndex = next((i for i, log in enumerate(allLogs) if log.id == logId), -1)
if logIndex >= 0:
# Return only logs after the specified log
- return [ChatLog(**log) for log in allLogs[logIndex + 1:]]
+ return allLogs[logIndex + 1:]
- return [ChatLog(**log) for log in allLogs]
+ return allLogs
except HTTPException:
raise
except Exception as e:
@@ -169,7 +242,7 @@ async def get_workflow_logs(
# API Endpoint for workflow messages with selective data transfer
@router.get("/{workflowId}/messages", response_model=List[ChatMessage])
-@limiter.limit("30/minute")
+@limiter.limit("120/minute")
async def get_workflow_messages(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
@@ -195,12 +268,12 @@ async def get_workflow_messages(
# Apply selective data transfer if messageId is provided
if messageId:
# Find the index of the message with the given ID
- messageIndex = next((i for i, msg in enumerate(allMessages) if msg.get("id") == messageId), -1)
+ messageIndex = next((i for i, msg in enumerate(allMessages) if msg.id == messageId), -1)
if messageIndex >= 0:
# Return only messages after the specified message
- return [ChatMessage(**msg) for msg in allMessages[messageIndex + 1:]]
+ return allMessages[messageIndex + 1:]
- return [ChatMessage(**msg) for msg in allMessages]
+ return allMessages
except HTTPException:
raise
except Exception as e:
@@ -212,7 +285,7 @@ async def get_workflow_messages(
# State 1: Workflow Initialization endpoint
@router.post("/start", response_model=ChatWorkflow)
-@limiter.limit("10/minute")
+@limiter.limit("120/minute")
async def start_workflow(
request: Request,
workflowId: Optional[str] = Query(None, description="Optional ID of the workflow to continue"),
@@ -230,7 +303,7 @@ async def start_workflow(
# Start or continue workflow using ChatObjects
workflow = await interfaceChat.workflowStart(currentUser, userInput, workflowId)
- return ChatWorkflow(**workflow)
+ return workflow
except Exception as e:
logger.error(f"Error in start_workflow: {str(e)}")
@@ -241,7 +314,7 @@ async def start_workflow(
# State 8: Workflow Stopped endpoint
@router.post("/{workflowId}/stop", response_model=ChatWorkflow)
-@limiter.limit("10/minute")
+@limiter.limit("120/minute")
async def stop_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to stop"),
@@ -255,7 +328,7 @@ async def stop_workflow(
# Stop workflow using ChatObjects
workflow = await interfaceChat.workflowStop(workflowId)
- return ChatWorkflow(**workflow)
+ return workflow
except Exception as e:
logger.error(f"Error in stop_workflow: {str(e)}")
@@ -266,7 +339,7 @@ async def stop_workflow(
# State 11: Workflow Reset/Deletion endpoint
@router.delete("/{workflowId}", response_model=Dict[str, Any])
-@limiter.limit("10/minute")
+@limiter.limit("120/minute")
async def delete_workflow(
request: Request,
workflowId: str = Path(..., description="ID of the workflow to delete"),
@@ -277,16 +350,18 @@ async def delete_workflow(
# Get service container
interfaceChat = getServiceChat(currentUser)
- # Verify workflow exists
- workflow = interfaceChat.getWorkflow(workflowId)
- if not workflow:
+ # Get raw workflow data from database to check permissions
+ workflows = interfaceChat.db.getRecordset("workflows", recordFilter={"id": workflowId})
+ if not workflows:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow with ID {workflowId} not found"
)
- # Check if user has permission to delete
- if workflow.get("_userId") != currentUser["id"]:
+ workflow_data = workflows[0]
+
+ # Check if user has permission to delete using the interface's permission system
+ if not interfaceChat._canModify("workflows", workflowId):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to delete this workflow"
@@ -318,7 +393,7 @@ async def delete_workflow(
# Document Management Endpoints
@router.delete("/{workflowId}/messages/{messageId}", response_model=Dict[str, Any])
-@limiter.limit("10/minute")
+@limiter.limit("120/minute")
async def delete_workflow_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
@@ -368,7 +443,7 @@ async def delete_workflow_message(
)
@router.delete("/{workflowId}/messages/{messageId}/files/{fileId}", response_model=Dict[str, Any])
-@limiter.limit("10/minute")
+@limiter.limit("120/minute")
async def delete_file_from_message(
request: Request,
workflowId: str = Path(..., description="ID of the workflow"),
diff --git a/modules/workflow/managerChat.py b/modules/workflow/managerChat.py
index d8674429..864a7dc9 100644
--- a/modules/workflow/managerChat.py
+++ b/modules/workflow/managerChat.py
@@ -78,20 +78,27 @@ class ChatManager:
})
# Phase 2: Task Definition and Action Generation
- async def defineTaskActions(self, task_step: Dict[str, Any], workflow: ChatWorkflow, previous_results: List[str] = None) -> List[TaskAction]:
- """Phase 2: Define specific actions for a task step"""
+ async def defineTaskActions(self, task_step: Dict[str, Any], workflow: ChatWorkflow, previous_results: List[str] = None,
+ enhanced_context: Dict[str, Any] = None) -> List[TaskAction]:
+ """Phase 2: Define specific actions for a task step with enhanced retry context"""
try:
logger.info(f"Defining actions for task: {task_step.get('description', 'Unknown')}")
- # Prepare context for action generation
- context = {
- 'task_step': task_step,
- 'workflow': workflow,
- 'workflow_id': workflow.id,
- 'available_documents': self._getAvailableDocuments(workflow),
- 'previous_results': previous_results or [],
- 'improvements': None
- }
+ # Use enhanced context if provided (for retries), otherwise create basic context
+ if enhanced_context:
+ context = enhanced_context
+ else:
+ context = {
+ 'task_step': task_step,
+ 'workflow': workflow,
+ 'workflow_id': workflow.id,
+ 'available_documents': self._getAvailableDocuments(workflow),
+ 'previous_results': previous_results or [],
+ 'improvements': None,
+ 'retry_count': 0,
+ 'previous_action_results': [],
+ 'previous_review_result': None
+ }
# Generate actions using AI
actions = await self._generateActionsForTaskStep(context)
@@ -369,6 +376,10 @@ class ChatManager:
for dep in dependencies:
if dep not in task_ids and dep != 'task_0': # Allow task_0 as special case
return False
+
+ # Validate ai_prompt if present (optional field)
+ if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str):
+ return False
return True
@@ -381,25 +392,27 @@ class ChatManager:
logger.warning("Creating fallback task plan due to AI generation failure")
return {
- "overview": "Fallback task plan - basic document analysis and processing",
+ "overview": "Fallback task plan - comprehensive document analysis and processing",
"tasks": [
{
"id": "task_1",
- "description": "Analyze all provided documents",
+ "description": "Extract and analyze all provided documents comprehensively",
"dependencies": [],
- "expected_outputs": ["document_analysis"],
- "success_criteria": ["All documents processed"],
+ "expected_outputs": ["comprehensive_document_analysis"],
+ "success_criteria": ["All documents processed and analyzed"],
"required_documents": context.get('available_documents', []),
- "estimated_complexity": "medium"
+ "estimated_complexity": "medium",
+ "ai_prompt": "Extract and analyze all content from the provided documents. Identify key information, patterns, and insights that are relevant to the user's request. Provide a comprehensive analysis that can be used for further processing."
},
{
"id": "task_2",
- "description": "Generate basic output based on analysis",
+ "description": "Generate comprehensive output based on analysis",
"dependencies": ["task_1"],
- "expected_outputs": ["basic_output"],
- "success_criteria": ["Output generated"],
- "required_documents": ["document_analysis"],
- "estimated_complexity": "low"
+ "expected_outputs": ["final_output"],
+ "success_criteria": ["Output generated and formatted appropriately"],
+ "required_documents": ["comprehensive_document_analysis"],
+ "estimated_complexity": "low",
+ "ai_prompt": "Based on the comprehensive document analysis, generate the final output that addresses the user's original request. Format the output appropriately and ensure it meets the user's requirements."
}
]
}
@@ -451,11 +464,14 @@ class ChatManager:
return False
def _createFallbackActions(self, task_step: Dict[str, Any], context: Dict[str, Any]) -> List[Dict[str, Any]]:
- """Create fallback actions when AI generation fails"""
+ """Create fallback actions when AI generation fails with retry context awareness"""
logger.warning("Creating fallback actions due to AI generation failure")
# Get available documents
available_docs = context.get('available_documents', [])
+ retry_count = context.get('retry_count', 0)
+ previous_action_results = context.get('previous_action_results', [])
+
if not available_docs:
logger.warning("No available documents for fallback actions")
return []
@@ -463,15 +479,20 @@ class ChatManager:
# Create fallback actions for document analysis
fallback_actions = []
for i, doc in enumerate(available_docs):
+ # Enhanced AI prompt for retry scenarios
+ ai_prompt = "Fallback document analysis for " + doc
+ if retry_count > 0 and previous_action_results:
+ ai_prompt += f". Previous attempt failed - ensure comprehensive extraction with detailed analysis."
+
fallback_actions.append({
"method": "document",
"action": "analyze",
"parameters": {
"documentList": ["task1_previous_results"],
- "aiPrompt": "Fallback document analysis for " + doc
+ "aiPrompt": ai_prompt
},
- "resultLabel": "task1_fallback:" + doc + ":analysis",
- "description": f"Fallback document analysis for {doc}"
+ "resultLabel": f"task1_fallback_retry{retry_count}:" + doc + ":analysis",
+ "description": f"Fallback document analysis for {doc} (attempt {retry_count + 1})"
})
logger.info(f"Created {len(fallback_actions)} fallback actions")
@@ -489,10 +510,19 @@ AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])}
INSTRUCTIONS:
1. Analyze the user request and available documents
-2. Break down the request into logical task steps
-3. Ensure all documents are properly utilized
-4. Create a sequence that ensures proper handover between tasks
-5. Return a JSON object with the exact structure shown below
+2. Break down the request into 2-4 meaningful high-level task steps
+3. Focus on business outcomes, not technical operations
+4. For document processing, create ONE task with a comprehensive AI prompt rather than multiple granular tasks
+5. Each task should produce meaningful, usable outputs
+6. Ensure proper handover between tasks using result labels
+7. Return a JSON object with the exact structure shown below
+
+TASK PLANNING PRINCIPLES:
+- Combine related operations into single tasks (e.g., "Extract and analyze all candidate profiles" instead of separate "read file" and "analyze content" tasks)
+- Use comprehensive AI prompts for document processing rather than multiple small tasks
+- Focus on business value and outcomes
+- Keep tasks at a meaningful level of abstraction
+- Each task should produce results that can be used by subsequent tasks
REQUIRED JSON STRUCTURE:
{{
@@ -500,25 +530,40 @@ REQUIRED JSON STRUCTURE:
"tasks": [
{{
"id": "task_1",
- "description": "Clear description of what this task does",
+ "description": "Clear description of what this task accomplishes (business outcome)",
"dependencies": ["task_0"], // IDs of tasks that must complete first
"expected_outputs": ["output1", "output2"],
"success_criteria": ["criteria1", "criteria2"],
"required_documents": ["doc1", "doc2"],
- "estimated_complexity": "low|medium|high"
+ "estimated_complexity": "low|medium|high",
+ "ai_prompt": "Comprehensive AI prompt for document processing tasks (if applicable)"
}}
]
}}
+EXAMPLES OF GOOD TASK DESCRIPTIONS:
+- "Extract and analyze all candidate profiles to identify key qualifications and experience"
+- "Create evaluation matrix and rate candidates against product designer criteria"
+- "Generate comprehensive PowerPoint presentation for management decision"
+- "Store final presentation in SharePoint for specified account"
+
+EXAMPLES OF BAD TASK DESCRIPTIONS:
+- "Open and read the PDF file" (too granular)
+- "Identify table structure" (technical detail)
+- "Convert data to CSV format" (implementation detail)
+
NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
async def _createActionDefinitionPrompt(self, context: Dict[str, Any]) -> str:
- """Create prompt for action generation"""
+ """Create prompt for action generation with enhanced document extraction guidance and retry context"""
task_step = context['task_step']
workflow = context.get('workflow')
available_docs = context['available_documents']
previous_results = context['previous_results']
improvements = context.get('improvements', '')
+ retry_count = context.get('retry_count', 0)
+ previous_action_results = context.get('previous_action_results', [])
+ previous_review_result = context.get('previous_review_result')
# Get available methods and actions with signatures
methodList = self.service.getMethodsList()
@@ -544,6 +589,32 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
for action, sig in actions:
available_methods_str += f" - {action}: {sig}\n"
+ # Get AI prompt from task step if available
+ task_ai_prompt = task_step.get('ai_prompt', '')
+
+ # Build retry context section
+ retry_context = ""
+ if retry_count > 0:
+ retry_context = f"""
+RETRY CONTEXT (Attempt {retry_count}):
+Previous action results that failed or were incomplete:
+"""
+ for i, result in enumerate(previous_action_results):
+ retry_context += f"- Action {i+1}: {result.get('actionMethod', 'unknown')}.{result.get('actionName', 'unknown')}\n"
+ retry_context += f" Status: {result.get('status', 'unknown')}\n"
+ retry_context += f" Error: {result.get('error', 'None')}\n"
+ retry_context += f" Result: {result.get('result', '')[:100]}...\n"
+
+ if previous_review_result:
+ retry_context += f"""
+Previous review feedback:
+- Status: {previous_review_result.get('status', 'unknown')}
+- Reason: {previous_review_result.get('reason', 'No reason provided')}
+- Quality Score: {previous_review_result.get('quality_score', 0)}/10
+- Missing Outputs: {', '.join(previous_review_result.get('missing_outputs', []))}
+- Unmet Criteria: {', '.join(previous_review_result.get('unmet_criteria', []))}
+"""
+
return f"""
You are an action generation AI that creates specific actions to accomplish a task step.
@@ -556,6 +627,7 @@ DOCUMENT REFERENCE TYPES:
TASK STEP: {task_step.get('description', 'Unknown')} (ID: {task_step.get('id', 'Unknown')})
EXPECTED OUTPUTS: {', '.join(task_step.get('expected_outputs', []))}
SUCCESS CRITERIA: {', '.join(task_step.get('success_criteria', []))}
+TASK AI PROMPT: {task_ai_prompt if task_ai_prompt else 'None provided'}
CONTEXT - Chat History:
{messageSummary}
@@ -568,34 +640,48 @@ AVAILABLE CONNECTIONS:
AVAILABLE DOCUMENTS:
{chr(10).join(f"- {doc.documentsLabel} contains {', '.join(doc.documents)}" for doc in all_doc_refs)}
- (Use the label as a value in documentList to refer to the group)
+
+(Use the label as a value in documentList to refer to the group)
PREVIOUS RESULTS: {', '.join(previous_results) if previous_results else 'None'}
-IMPROVEMENTS NEEDED: {improvements if improvements else 'None'}
+IMPROVEMENTS NEEDED: {improvements if improvements else 'None'}{retry_context}
+
+ACTION GENERATION PRINCIPLES:
+- Create meaningful actions per task step
+- Use comprehensive AI prompts for document processing
+- Focus on business outcomes, not technical operations
+- Combine related operations into single actions when possible
+- Use the task's AI prompt if provided, or create a comprehensive one
+- Each action should produce meaningful, usable outputs
+- For document extraction, ensure prompts are specific and detailed
+- Include validation steps in extraction prompts
+- If this is a retry, learn from previous failures and improve the approach
+- Address specific issues mentioned in previous review feedback
INSTRUCTIONS:
-- Generate actions to accomplish this task step using available documents, connections, and previous results.
-- Use docItem for single documents and docList labels for groups of documents as shown in AVAILABLE DOCUMENTS.
-- Always pass documentList as a LIST of references (docItem and/or docList).
+- Generate actions to accomplish this task step using available documents, connections, and previous results
+- Use docItem for single documents and docList labels for groups of documents as shown in AVAILABLE DOCUMENTS
+- Always pass documentList as a LIST of references (docItem and/or docList)
- For resultLabel, use the format: "task{{task_id}}_action{{action_number}}_{{short_label}}" where:
- {{task_id}} = the current task's id (e.g., 1)
- {{action_number}} = the sequence number of the action within the task (e.g., 2)
- {{short_label}} = a short, descriptive label for the output (e.g., "analysis_results")
Example: "task1_action2_analysis_results"
+- If this is a retry, ensure the new actions address the specific issues from previous attempts
- Follow the JSON structure below. All fields are required.
REQUIRED JSON STRUCTURE:
{{
"actions": [
-
+ {{
"method": "method_name", // Use only the method name (e.g., "document")
"action": "action_name", // Use only the action name (e.g., "extract")
"parameters": {{
"documentList": ["docItem:doc_abc:file1.txt", "task1_action2_results"],
- "aiPrompt": "Describe what to do"
+ "aiPrompt": "Comprehensive AI prompt describing what to accomplish"
}},
"resultLabel": "task1_action3_analysis_results",
- "description": "What this action does"
+ "description": "What this action accomplishes (business outcome)"
}}
]
}}
@@ -605,35 +691,34 @@ FIELD REQUIREMENTS:
- "action": Must be valid for the method
- "parameters": Method-specific, must include documentList as a list if required by the signature
- "resultLabel": Must follow the format above (e.g., "task1_action3_analysis_results")
-- "description": Clear summary of the action
+- "description": Clear summary of the business outcome
-EXAMPLES:
-1. Analyze a single document:
+EXAMPLES OF GOOD ACTIONS:
+1. Comprehensive document analysis:
{{
"method": "document",
- "action": "analyze",
+ "action": "extract",
"parameters": {{
"documentList": ["docItem:doc_57520394-6b6d-41c2-b641-bab3fc6d7f4b:candidate_1_profile.txt"],
- "aiPrompt": "Analyze the candidate profile for key insights"
+ "aiPrompt": "Extract and analyze the candidate's qualifications, experience, skills, and suitability for the product designer position. Identify key strengths, relevant experience, technical skills, and any areas of concern. Provide a comprehensive assessment that can be used for evaluation."
}},
- "resultLabel": "task1_action2_candidate_analysis",
- "description": "Analyze candidate profile for insights"
+ "resultLabel": "task1_action1_candidate_analysis",
+ "description": "Comprehensive analysis of candidate profile for evaluation"
}}
-2. Analyze a group of documents (docList):
+2. Multi-document processing:
{{
"method": "document",
- "action": "analyze",
+ "action": "extract",
"parameters": {{
- "documentList": ["task1_action1_extract_results"],
- "aiPrompt": "Analyze all extracted results"
+ "documentList": ["task1_action1_candidate_analysis", "task1_action2_candidate_analysis", "task1_action3_candidate_analysis"],
+ "aiPrompt": "Compare all three candidate profiles and create an evaluation matrix. Rate each candidate on technical skills, experience level, cultural fit, portfolio quality, and communication skills. Provide clear rankings and recommendations for the product designer position."
}},
- "resultLabel": "task1_action2_analysis_results",
- "description": "Analyze all extracted results"
+ "resultLabel": "task1_action4_evaluation_matrix",
+ "description": "Create comprehensive evaluation matrix comparing all candidates"
}}
-NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.
-"""
+NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
def _createResultReviewPrompt(self, review_context: Dict[str, Any]) -> str:
@@ -641,24 +726,46 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.
task_step = review_context['task_step']
step_result = review_context['step_result']
- # Create serializable version of step_result
+ # Create serializable version of step_result with only metadata (no document content)
step_result_serializable = {
- 'task_step': step_result.get('task_step', {}),
+ 'task_step': {
+ 'id': task_step.get('id', ''),
+ 'description': task_step.get('description', ''),
+ 'expected_outputs': task_step.get('expected_outputs', []),
+ 'success_criteria': task_step.get('success_criteria', [])
+ },
'action_results': [],
'successful_actions': step_result.get('successful_actions', 0),
'total_actions': step_result.get('total_actions', 0),
- 'results': step_result.get('results', []),
- 'errors': step_result.get('errors', [])
+ 'results_count': len(step_result.get('results', [])),
+ 'errors_count': len(step_result.get('errors', []))
}
- # Convert action_results to serializable format
+ # Convert action_results to serializable format with only metadata (no document content)
for action_result in step_result.get('action_results', []):
+ # Extract only document metadata, not content
+ documents_metadata = []
+ for doc in action_result.get('documents', []):
+ if hasattr(doc, 'filename'):
+ documents_metadata.append({
+ 'filename': doc.filename,
+ 'fileSize': getattr(doc, 'fileSize', 0),
+ 'mimeType': getattr(doc, 'mimeType', 'unknown')
+ })
+ elif isinstance(doc, dict):
+ documents_metadata.append({
+ 'filename': doc.get('filename', 'unknown'),
+ 'fileSize': doc.get('fileSize', 0),
+ 'mimeType': doc.get('mimeType', 'unknown')
+ })
+
serializable_action_result = {
'status': action_result.get('status', ''),
- 'result': action_result.get('result', ''),
+ 'result_summary': action_result.get('result', '')[:200] + '...' if len(action_result.get('result', '')) > 200 else action_result.get('result', ''),
'error': action_result.get('error', ''),
'resultLabel': action_result.get('resultLabel', ''),
- 'documents': action_result.get('documents', []),
+ 'documents_count': len(documents_metadata),
+ 'documents_metadata': documents_metadata,
'actionId': action_result.get('actionId', ''),
'actionMethod': action_result.get('actionMethod', ''),
'actionName': action_result.get('actionName', '')
@@ -696,7 +803,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== HELPER METHODS FOR WORKFLOW PHASES =====
async def _generateActionsForTaskStep(self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
- """Generate actions for a specific task step"""
+ """Generate actions for a specific task step with enhanced retry context"""
try:
# Prepare prompt for action generation
prompt = await self._createActionDefinitionPrompt(context)
@@ -720,7 +827,7 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
return self._createFallbackActions(context['task_step'], context)
async def _executeSingleAction(self, action: TaskAction, workflow: ChatWorkflow) -> Dict[str, Any]:
- """Execute a single action and return result"""
+ """Execute a single action and return result with enhanced document processing"""
try:
# Execute the actual method action using the service container
result = await self.service.executeAction(
@@ -744,12 +851,68 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
else:
action.setError(result.error or "Action execution failed")
+ # Enhanced result processing with better document handling
+ documents = result.data.get("documents", [])
+ processed_documents = []
+
+ # Process documents with better metadata extraction
+ for doc in documents:
+ if hasattr(doc, 'filename') and doc.filename:
+ # Document object with proper metadata
+ mime_type = getattr(doc, 'mimeType', 'application/octet-stream')
+
+ # Enhanced MIME type detection for document objects
+ if mime_type == "application/octet-stream":
+ mime_type = self._detectMimeTypeFromDocument(doc, doc.filename)
+
+ processed_documents.append({
+ 'filename': doc.filename,
+ 'fileSize': getattr(doc, 'fileSize', 0),
+ 'mimeType': mime_type,
+ 'content': getattr(doc, 'content', ''),
+ 'document': doc
+ })
+ elif isinstance(doc, dict):
+ # Dictionary document with metadata
+ filename = doc.get('documentName', doc.get('filename', f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"))
+ fileSize = doc.get('fileSize', len(str(doc.get('documentData', ''))))
+ mimeType = doc.get('mimeType', 'application/octet-stream')
+
+ # Enhanced MIME type detection for dictionary documents
+ if mimeType == "application/octet-stream":
+ document_data = doc.get('documentData', '')
+ mimeType = self._detectMimeTypeFromContent(document_data, filename)
+
+ processed_documents.append({
+ 'filename': filename,
+ 'fileSize': fileSize,
+ 'mimeType': mimeType,
+ 'content': doc.get('documentData', ''),
+ 'document': doc
+ })
+ else:
+ # Fallback for unknown document types
+ logger.warning(f"Unknown document type for action {action.execMethod}.{action.execAction}: {type(doc)}")
+ filename = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
+ mimeType = 'application/octet-stream'
+
+ # Try to detect MIME type for unknown document types
+ mimeType = self._detectMimeTypeFromContent(doc, filename)
+
+ processed_documents.append({
+ 'filename': filename,
+ 'fileSize': 0,
+ 'mimeType': mimeType,
+ 'content': str(doc),
+ 'document': doc
+ })
+
return {
"status": "completed" if result.success else "failed",
"result": result.data.get("result", ""),
"error": result.error or "",
"resultLabel": result_label,
- "documents": result.data.get("documents", []),
+ "documents": processed_documents,
"actionId": action.id,
"actionMethod": action.execMethod,
"actionName": action.execAction
@@ -763,11 +926,12 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
"error": str(e),
"actionId": action.id,
"actionMethod": action.execMethod,
- "actionName": action.execAction
+ "actionName": action.execAction,
+ "documents": []
}
async def _createActionMessage(self, action: TaskAction, result: Any, workflow: ChatWorkflow, result_label: str = None) -> None:
- """Create and store a message for the action result in the workflow"""
+ """Create and store a message for the action result in the workflow with enhanced document processing"""
try:
# Get result data
result_data = result.data if hasattr(result, 'data') else {}
@@ -795,16 +959,37 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
processed_documents = []
for doc_data in documents_data:
try:
- # Extract document information
- document_name = doc_data.get("documentName", f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}")
- document_data = doc_data.get("documentData", {})
+ # Handle different document data formats
+ if isinstance(doc_data, dict):
+ # Enhanced document processing for dictionary format
+ document_name = doc_data.get("documentName", doc_data.get("filename", f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"))
+ document_data = doc_data.get("documentData", {})
+ file_size = doc_data.get("fileSize", 0)
+ mime_type = doc_data.get("mimeType", "application/octet-stream")
+ elif hasattr(doc_data, 'filename'):
+ # Document object format
+ document_name = doc_data.filename
+ document_data = getattr(doc_data, 'content', {})
+ file_size = getattr(doc_data, 'fileSize', 0)
+ mime_type = getattr(doc_data, 'mimeType', "application/octet-stream")
+ else:
+ # Fallback for unknown formats
+ document_name = f"{action.execMethod}_{action.execAction}_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}"
+ document_data = doc_data
+ file_size = len(str(doc_data))
+ mime_type = "application/octet-stream"
- # Determine file extension and MIME type
- file_extension = self._getFileExtension(document_name)
- mime_type = self._getMimeType(file_extension)
+ # Enhanced MIME type detection using service container
+ if mime_type == "application/octet-stream":
+ mime_type = self._detectMimeTypeFromContent(document_data, document_name)
# Convert document data to string content
- content = self._convertDocumentDataToString(document_data, file_extension)
+ content = self._convertDocumentDataToString(document_data, self._getFileExtension(document_name))
+
+ # Validate content before creating file
+ if not content or content.strip() == "":
+ logger.warning(f"Empty content for document {document_name}, skipping")
+ continue
# Create file in database
file_id = self.service.createFile(
@@ -814,6 +999,10 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
base64encoded=False
)
+ if not file_id:
+ logger.error(f"Failed to create file for document {document_name}")
+ continue
+
# Create ChatDocument object
document = self.service.createDocument(
fileName=document_name,
@@ -824,7 +1013,9 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
if document:
processed_documents.append(document)
- logger.info(f"Created document: {document_name} with file ID: {file_id}")
+ logger.info(f"Created document: {document_name} with file ID: {file_id} and MIME type: {mime_type}")
+ else:
+ logger.error(f"Failed to create ChatDocument object for {document_name}")
except Exception as e:
logger.error(f"Error processing document {doc_data.get('documentName', 'unknown')}: {str(e)}")
@@ -846,42 +1037,93 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
def _getFileExtension(self, filename: str) -> str:
"""Extract file extension from filename"""
- if '.' in filename:
- return filename.split('.')[-1].lower()
- return "txt" # Default to text
+ return self.service.getFileExtension(filename)
def _getMimeType(self, extension: str) -> str:
"""Get MIME type based on file extension"""
- mime_types = {
- 'txt': 'text/plain',
- 'json': 'application/json',
- 'xml': 'application/xml',
- 'csv': 'text/csv',
- 'html': 'text/html',
- 'md': 'text/markdown',
- 'py': 'text/x-python',
- 'js': 'application/javascript',
- 'css': 'text/css',
- 'pdf': 'application/pdf',
- 'doc': 'application/msword',
- 'docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
- 'xls': 'application/vnd.ms-excel',
- 'xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- 'ppt': 'application/vnd.ms-powerpoint',
- 'pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation'
- }
- return mime_types.get(extension, 'application/octet-stream')
+ return self.service.getMimeTypeFromExtension(extension)
+
+ def _detectMimeTypeFromContent(self, content: Any, filename: str) -> str:
+ """
+ Detect MIME type from content and filename using service container.
+ Only returns a detected MIME type if it's better than application/octet-stream.
+
+ Args:
+ content: Content data (string, dict, or other)
+ filename: Name of the file
+
+ Returns:
+ str: Detected MIME type or original if detection failed
+ """
+ try:
+ # Convert content to bytes for MIME type detection
+ if isinstance(content, str):
+ file_bytes = content.encode('utf-8')
+ elif isinstance(content, dict):
+ import json
+ file_bytes = json.dumps(content, ensure_ascii=False).encode('utf-8')
+ else:
+ file_bytes = str(content).encode('utf-8')
+
+ # Use service container's MIME type detection
+ detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename)
+ if detected_mime_type != "application/octet-stream":
+ return detected_mime_type
+ return "application/octet-stream"
+ except Exception as e:
+ logger.warning(f"Error in MIME type detection for {filename}: {str(e)}")
+ return 'application/octet-stream'
+
+ def _detectMimeTypeFromDocument(self, document: Any, filename: str) -> str:
+ """
+ Detect MIME type from document object using service container.
+ Only returns a detected MIME type if it's better than application/octet-stream.
+
+ Args:
+ document: Document object with content attribute
+ filename: Name of the file
+
+ Returns:
+ str: Detected MIME type or original if detection failed
+ """
+ try:
+ # Get document content as bytes for MIME type detection
+ content = getattr(document, 'content', '')
+ if isinstance(content, str):
+ file_bytes = content.encode('utf-8')
+ else:
+ file_bytes = str(content).encode('utf-8')
+
+ # Use service container's MIME type detection
+ detected_mime_type = self.service.detectContentTypeFromData(file_bytes, filename)
+ if detected_mime_type != "application/octet-stream":
+ return detected_mime_type
+ return "application/octet-stream"
+ except Exception as e:
+ logger.warning(f"Error in MIME type detection for document {filename}: {str(e)}")
+ return 'application/octet-stream'
def _convertDocumentDataToString(self, document_data: Dict[str, Any], file_extension: str) -> str:
- """Convert document data to string content based on file type"""
+ """Convert document data to string content based on file type with enhanced processing"""
try:
- if file_extension == 'json':
- return json.dumps(document_data, indent=2, ensure_ascii=False)
- elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']:
+ # Handle None or empty data
+ if document_data is None:
+ return ""
+
+ # Handle string data directly
+ if isinstance(document_data, str):
+ return document_data
+
+ # Handle dictionary data
+ if isinstance(document_data, dict):
+ # For JSON files, return formatted JSON
+ if file_extension == 'json':
+ return json.dumps(document_data, indent=2, ensure_ascii=False)
+
# For text files, try to extract text content
- if isinstance(document_data, dict):
+ elif file_extension in ['txt', 'md', 'html', 'css', 'js', 'py']:
# Look for common text content fields
- text_fields = ['content', 'text', 'data', 'result', 'summary']
+ text_fields = ['content', 'text', 'data', 'result', 'summary', 'extracted_content', 'table_data']
for field in text_fields:
if field in document_data:
content = document_data[field]
@@ -892,20 +1134,73 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# If no text field found, convert entire dict to JSON
return json.dumps(document_data, indent=2, ensure_ascii=False)
- elif isinstance(document_data, str):
- return document_data
- else:
- return str(document_data)
- else:
+
+ # For CSV files, try to extract table data
+ elif file_extension == 'csv':
+ # Look for CSV-specific fields
+ csv_fields = ['table_data', 'csv_data', 'rows', 'data']
+ for field in csv_fields:
+ if field in document_data:
+ content = document_data[field]
+ if isinstance(content, str):
+ return content
+ elif isinstance(content, list):
+ # Convert list of rows to CSV format
+ if content and isinstance(content[0], (list, dict)):
+ import csv
+ import io
+ output = io.StringIO()
+ if isinstance(content[0], dict):
+ # List of dictionaries
+ if content:
+ fieldnames = content[0].keys()
+ writer = csv.DictWriter(output, fieldnames=fieldnames)
+ writer.writeheader()
+ writer.writerows(content)
+ else:
+ # List of lists
+ writer = csv.writer(output)
+ writer.writerows(content)
+ return output.getvalue()
+
+ # Fallback to JSON if no CSV data found
+ return json.dumps(document_data, indent=2, ensure_ascii=False)
+
# For other file types, convert to JSON
- return json.dumps(document_data, indent=2, ensure_ascii=False)
+ else:
+ return json.dumps(document_data, indent=2, ensure_ascii=False)
+
+ # Handle list data
+ elif isinstance(document_data, list):
+ if file_extension == 'csv':
+ # Convert list to CSV format
+ import csv
+ import io
+ output = io.StringIO()
+ if document_data and isinstance(document_data[0], dict):
+ # List of dictionaries
+ fieldnames = document_data[0].keys()
+ writer = csv.DictWriter(output, fieldnames=fieldnames)
+ writer.writeheader()
+ writer.writerows(document_data)
+ else:
+ # List of lists
+ writer = csv.writer(output)
+ writer.writerows(document_data)
+ return output.getvalue()
+ else:
+ return json.dumps(document_data, indent=2, ensure_ascii=False)
+
+ # Handle other data types
+ else:
+ return str(document_data)
except Exception as e:
logger.error(f"Error converting document data to string: {str(e)}")
return str(document_data)
async def _performTaskReview(self, review_context: Dict[str, Any]) -> Dict[str, Any]:
- """Perform AI-based task review"""
+ """Perform AI-based task review with enhanced retry logic"""
try:
# Prepare prompt for result review
prompt = self._createResultReviewPrompt(review_context)
@@ -921,6 +1216,38 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
review.setdefault('reason', 'No reason provided')
review.setdefault('quality_score', 5)
+ # Enhanced retry logic based on result quality
+ if review.get('status') == 'retry':
+ # Analyze the specific issues for better retry guidance
+ action_results = review_context.get('action_results', [])
+ if action_results:
+ # Check for common issues that warrant retry
+ has_empty_results = any(
+ not result.get('result', '').strip()
+ for result in action_results
+ if result.get('status') == 'completed'
+ )
+
+ has_incomplete_metadata = any(
+ any(doc.get('filename') == 'unknown' for doc in result.get('documents_metadata', []))
+ for result in action_results
+ if result.get('status') == 'completed'
+ )
+
+ if has_empty_results:
+ review['improvements'] = (review.get('improvements', '') +
+ " Ensure the document extraction returns actual content, not empty results. " +
+ "Check if the AI prompt is specific enough to extract meaningful data.")
+
+ if has_incomplete_metadata:
+ review['improvements'] = (review.get('improvements', '') +
+ " Ensure proper document metadata is extracted including filename, size, and mime type. " +
+ "The document processing should provide complete file information.")
+
+ # If we have specific issues, adjust quality score
+ if has_empty_results or has_incomplete_metadata:
+ review['quality_score'] = max(1, review.get('quality_score', 5) - 2)
+
return review
except Exception as e:
@@ -1110,10 +1437,20 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
# ===== UNIFIED WORKFLOW EXECUTION =====
async def executeUnifiedWorkflow(self, userInput: str, workflow: ChatWorkflow) -> Dict[str, Any]:
- """Execute workflow using the new unified phases"""
+ """Execute workflow using the new unified phases with retry logic"""
try:
logger.info(f"Starting unified workflow execution for workflow {workflow.id}")
+ # Create user-friendly progress log
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": "Starting workflow analysis and planning",
+ "type": "info",
+ "status": "running",
+ "progress": 5,
+ "agentName": "System"
+ })
+
# Phase 1: High-Level Task Planning
logger.info("=== PHASE 1: HIGH-LEVEL TASK PLANNING ===")
task_plan = await self.planHighLevelTasks(userInput, workflow)
@@ -1125,95 +1462,423 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'phase': 'planning'
}
- # Log task plan details
- logger.debug(f"TASK PLAN CREATED: {json.dumps(task_plan, indent=2, ensure_ascii=False)}")
+ # Create user-friendly task plan log
+ tasks_count = len(task_plan.get('tasks', []))
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Planning completed: {tasks_count} tasks identified",
+ "type": "info",
+ "status": "running",
+ "progress": 15,
+ "agentName": "System"
+ })
- # Execute each task step
+ # Log task plan details (without document content)
+ task_plan_log = {
+ 'overview': task_plan.get('overview', ''),
+ 'tasks_count': len(task_plan.get('tasks', [])),
+ 'tasks': []
+ }
+ for task in task_plan.get('tasks', []):
+ task_log = {
+ 'id': task.get('id', ''),
+ 'description': task.get('description', ''),
+ 'dependencies': task.get('dependencies', []),
+ 'expected_outputs': task.get('expected_outputs', []),
+ 'success_criteria': task.get('success_criteria', []),
+ 'required_documents_count': len(task.get('required_documents', [])),
+ 'estimated_complexity': task.get('estimated_complexity', '')
+ }
+ task_plan_log['tasks'].append(task_log)
+ logger.debug(f"TASK PLAN CREATED: {json.dumps(task_plan_log, indent=2, ensure_ascii=False)}")
+
+ # Execute each task step with retry logic
workflow_results = []
previous_results = []
+ max_retries = 3 # Maximum retries per task
for i, task_step in enumerate(task_plan['tasks']):
- logger.info(f"=== PROCESSING TASK {i+1}/{len(task_plan['tasks'])}: {task_step.get('description', 'Unknown')} ===")
+ task_description = task_step.get('description', 'Unknown')
+ logger.info(f"=== PROCESSING TASK {i+1}/{len(task_plan['tasks'])}: {task_description} ===")
- # Phase 2: Define Task Actions
- logger.info(f"--- PHASE 2: DEFINING ACTIONS FOR TASK {i+1} ---")
- task_actions = await self.defineTaskActions(task_step, workflow, previous_results)
- if not task_actions:
- logger.warning(f"No actions defined for task {i+1}, skipping")
- continue
+ # Create user-friendly task start log
+ progress = 20 + (i * 60 // len(task_plan['tasks']))
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Executing task {i+1}/{len(task_plan['tasks'])}: {task_description}",
+ "type": "info",
+ "status": "running",
+ "progress": progress,
+ "agentName": "System"
+ })
- # Log task actions (convert to serializable format)
- task_actions_serializable = []
- for action in task_actions:
- action_dict = {
- 'execMethod': action.execMethod,
- 'execAction': action.execAction,
- 'execParameters': action.execParameters,
- 'execResultLabel': action.execResultLabel,
- 'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
- }
- task_actions_serializable.append(action_dict)
- logger.debug(f"TASK {i+1} ACTIONS CREATED: {json.dumps(task_actions_serializable, indent=2, ensure_ascii=False)}")
+ # Retry loop for each task
+ task_success = False
+ retry_count = 0
+ task_actions = []
+ action_results = []
+ review_result = {}
+ handover_data = {}
+ previous_action_results = [] # Track previous action results for retry context
+ previous_review_feedback = "" # Track previous review feedback for retry context
- # Phase 3: Execute Task Actions
- logger.info(f"--- PHASE 3: EXECUTING ACTIONS FOR TASK {i+1} ---")
- action_results = await self.executeTaskActions(task_actions, workflow)
+ while not task_success and retry_count < max_retries:
+ if retry_count > 0:
+ logger.info(f"--- RETRY {retry_count}/{max_retries} FOR TASK {i+1} ---")
+ # Create user-friendly retry log
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Retrying task {i+1} (attempt {retry_count}/{max_retries})",
+ "type": "warning",
+ "status": "running",
+ "progress": progress,
+ "agentName": "System"
+ })
+
+ try:
+ # Phase 2: Define Task Actions
+ logger.info(f"--- PHASE 2: DEFINING ACTIONS FOR TASK {i+1} ---")
+
+ # Enhanced context for retries - include previous results and feedback
+ enhanced_previous_results = previous_results.copy() if previous_results else []
+ if retry_count > 0 and previous_action_results:
+ # Add previous action results to context
+ for result in previous_action_results:
+ if result.get('resultLabel'):
+ enhanced_previous_results.append(result.get('resultLabel'))
+
+ # Create enhanced context with retry information
+ context = {
+ 'task_step': task_step,
+ 'workflow': workflow,
+ 'workflow_id': workflow.id,
+ 'available_documents': self._getAvailableDocuments(workflow),
+ 'previous_results': enhanced_previous_results,
+ 'improvements': previous_review_feedback if retry_count > 0 else None,
+ 'retry_count': retry_count,
+ 'previous_action_results': previous_action_results if retry_count > 0 else [],
+ 'previous_review_result': review_result if retry_count > 0 else None
+ }
+
+ task_actions = await self.defineTaskActions(task_step, workflow, enhanced_previous_results, context)
+ if not task_actions:
+ logger.warning(f"No actions defined for task {i+1}, skipping")
+ break
+
+ # Log task actions (convert to serializable format with metadata only)
+ task_actions_serializable = []
+ for action in task_actions:
+ # Extract only metadata from parameters, not document content
+ parameters_metadata = {}
+ if hasattr(action, 'execParameters') and action.execParameters:
+ for key, value in action.execParameters.items():
+ if key == 'documentList':
+ # Log document list as count and labels only
+ if isinstance(value, list):
+ parameters_metadata[key] = {
+ 'count': len(value),
+ 'labels': [str(v).split(':')[-1] if ':' in str(v) else str(v) for v in value]
+ }
+ else:
+ parameters_metadata[key] = str(value)
+ elif key == 'aiPrompt':
+ # Truncate AI prompts to avoid logging large content
+ parameters_metadata[key] = str(value)[:100] + '...' if len(str(value)) > 100 else str(value)
+ else:
+ parameters_metadata[key] = str(value)
+
+ action_dict = {
+ 'execMethod': action.execMethod,
+ 'execAction': action.execAction,
+ 'execParameters': parameters_metadata,
+ 'execResultLabel': action.execResultLabel,
+ 'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
+ }
+ task_actions_serializable.append(action_dict)
+ logger.debug(f"TASK {i+1} ACTIONS CREATED: {json.dumps(task_actions_serializable, indent=2, ensure_ascii=False)}")
+
+ # Phase 3: Execute Task Actions
+ logger.info(f"--- PHASE 3: EXECUTING ACTIONS FOR TASK {i+1} ---")
+ action_results = await self.executeTaskActions(task_actions, workflow)
+
+ # Create user-friendly action completion log with quality metrics
+ successful_actions = sum(1 for result in action_results if result.get('status') == 'completed')
+ total_actions = len(action_results)
+
+ if total_actions > 0:
+ quality_percentage = (successful_actions / total_actions) * 100
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Task {i+1} actions completed: {successful_actions}/{total_actions} successful ({quality_percentage:.0f}% quality)",
+ "type": "success" if quality_percentage >= 80 else "warning" if quality_percentage >= 60 else "error",
+ "status": "running",
+ "progress": progress + 10,
+ "agentName": "System"
+ })
+
+ # Log action results (with metadata only)
+ action_results_metadata = []
+ for result in action_results:
+ # Extract document metadata only
+ documents_metadata = []
+ for doc in result.get('documents', []):
+ if hasattr(doc, 'filename'):
+ documents_metadata.append({
+ 'filename': doc.filename,
+ 'fileSize': getattr(doc, 'fileSize', 0),
+ 'mimeType': getattr(doc, 'mimeType', 'unknown')
+ })
+ elif isinstance(doc, dict):
+ documents_metadata.append({
+ 'filename': doc.get('filename', 'unknown'),
+ 'fileSize': doc.get('fileSize', 0),
+ 'mimeType': doc.get('mimeType', 'unknown')
+ })
+
+ result_metadata = {
+ 'status': result.get('status', ''),
+ 'result_summary': result.get('result', '')[:200] + '...' if len(result.get('result', '')) > 200 else result.get('result', ''),
+ 'error': result.get('error', ''),
+ 'resultLabel': result.get('resultLabel', ''),
+ 'documents_count': len(documents_metadata),
+ 'documents_metadata': documents_metadata,
+ 'actionId': result.get('actionId', ''),
+ 'actionMethod': result.get('actionMethod', ''),
+ 'actionName': result.get('actionName', '')
+ }
+ action_results_metadata.append(result_metadata)
+ logger.debug(f"TASK {i+1} ACTION RESULTS: {json.dumps(action_results_metadata, indent=2, ensure_ascii=False)}")
+
+ # Phase 4: Review Task Completion
+ logger.info(f"--- PHASE 4: REVIEWING TASK {i+1} COMPLETION ---")
+ review_result = await self.reviewTaskCompletion(task_step, task_actions, action_results, workflow)
+
+ # Create user-friendly review log with quality metrics
+ quality_metrics = review_result.get('quality_metrics', {})
+ quality_score = quality_metrics.get('score', 0)
+ confidence = quality_metrics.get('confidence', 0)
+
+ review_status = review_result.get('status', 'unknown')
+ if review_status == 'success':
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Task {i+1} completed successfully (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)",
+ "type": "success",
+ "status": "running",
+ "progress": progress + 20,
+ "agentName": "System"
+ })
+ elif review_status == 'retry':
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Task {i+1} needs improvement (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)",
+ "type": "warning",
+ "status": "running",
+ "progress": progress + 15,
+ "agentName": "System"
+ })
+ else:
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Task {i+1} failed (Quality: {quality_score:.0f}%, Confidence: {confidence:.0f}%)",
+ "type": "error",
+ "status": "running",
+ "progress": progress + 15,
+ "agentName": "System"
+ })
+
+ # Log review result (with metadata only)
+ review_result_metadata = {
+ 'status': review_result.get('status', ''),
+ 'reason': review_result.get('reason', ''),
+ 'improvements': review_result.get('improvements', ''),
+ 'quality_score': review_result.get('quality_score', 0),
+ 'missing_outputs_count': len(review_result.get('missing_outputs', [])),
+ 'met_criteria_count': len(review_result.get('met_criteria', [])),
+ 'unmet_criteria_count': len(review_result.get('unmet_criteria', []))
+ }
+ logger.debug(f"TASK {i+1} REVIEW RESULT: {json.dumps(review_result_metadata, indent=2, ensure_ascii=False)}")
+
+ # Phase 5: Prepare Task Handover
+ logger.info(f"--- PHASE 5: PREPARING TASK {i+1} HANDOVER ---")
+ handover_data = await self.prepareTaskHandover(task_step, task_actions, review_result, workflow)
+
+ # Log handover data (with metadata only)
+ handover_data_metadata = {
+ 'task_step_id': handover_data.get('task_step', {}).get('id', ''),
+ 'task_actions_count': len(handover_data.get('task_actions', [])),
+ 'review_status': handover_data.get('review_result', {}).get('status', ''),
+ 'next_task_ready': handover_data.get('next_task_ready', False),
+ 'available_results_count': len(handover_data.get('available_results', []))
+ }
+ logger.debug(f"TASK {i+1} HANDOVER DATA: {json.dumps(handover_data_metadata, indent=2, ensure_ascii=False)}")
+
+ # Check if task is successful or needs retry
+ review_status = review_result.get('status', 'unknown')
+ if review_status == 'success':
+ task_success = True
+ logger.info(f"Task {i+1} completed successfully")
+ elif review_status == 'retry':
+ # Store current results and feedback for next retry
+ previous_action_results = action_results.copy()
+ previous_review_feedback = review_result.get('improvements', '')
+
+ retry_count += 1
+ if retry_count >= max_retries:
+ logger.error(f"Task {i+1} failed after {max_retries} retries")
+ task_success = False
+ else:
+ logger.info(f"Task {i+1} needs retry (attempt {retry_count}/{max_retries})")
+ logger.info(f"Previous feedback: {previous_review_feedback}")
+ # Add delay before retry
+ await asyncio.sleep(2)
+ continue
+ elif review_status == 'failed':
+ logger.error(f"Task {i+1} failed permanently")
+ task_success = False
+ break
+ else:
+ logger.warning(f"Unknown review status '{review_status}' for task {i+1}")
+ task_success = False
+ break
+
+ except Exception as e:
+ logger.error(f"Error processing task {i+1} (attempt {retry_count + 1}): {str(e)}")
+ retry_count += 1
+ if retry_count >= max_retries:
+ logger.error(f"Task {i+1} failed after {max_retries} retries due to exceptions")
+ task_success = False
+ break
+ else:
+ logger.info(f"Retrying task {i+1} after exception")
+ await asyncio.sleep(2)
+ continue
- # Log action results
- logger.debug(f"TASK {i+1} ACTION RESULTS: {json.dumps(action_results, indent=2, ensure_ascii=False)}")
-
- # Phase 4: Review Task Completion
- logger.info(f"--- PHASE 4: REVIEWING TASK {i+1} COMPLETION ---")
- review_result = await self.reviewTaskCompletion(task_step, task_actions, action_results, workflow)
-
- # Log review result
- logger.debug(f"TASK {i+1} REVIEW RESULT: {json.dumps(review_result, indent=2, ensure_ascii=False)}")
-
- # Phase 5: Prepare Task Handover
- logger.info(f"--- PHASE 5: PREPARING TASK {i+1} HANDOVER ---")
- handover_data = await self.prepareTaskHandover(task_step, task_actions, review_result, workflow)
-
- # Log handover data
- logger.debug(f"TASK {i+1} HANDOVER DATA: {json.dumps(handover_data, indent=2, ensure_ascii=False)}")
-
- # Collect results for next iteration
+ # Collect results regardless of success/failure
workflow_results.append({
'task_step': task_step,
'task_actions': task_actions,
'action_results': action_results,
'review_result': review_result,
- 'handover_data': handover_data
+ 'handover_data': handover_data,
+ 'retry_count': retry_count,
+ 'task_success': task_success
})
- # Update previous results for next task
- previous_results = handover_data.get('available_results', [])
-
- # Check if we should continue
- if review_result.get('status') == 'failed':
- logger.error(f"Task {i+1} failed, stopping workflow")
+ # Update previous results for next task if successful
+ if task_success and handover_data.get('next_task_ready', False):
+ previous_results = handover_data.get('available_results', [])
+ else:
+ # If task failed, stop workflow
+ logger.warning(f"Task {i+1} not successful, stopping workflow")
break
# Final workflow summary
- successful_tasks = sum(1 for result in workflow_results if result['review_result'].get('status') == 'success')
+ successful_tasks = sum(1 for result in workflow_results if result.get('task_success', False))
total_tasks = len(workflow_results)
- # Create serializable workflow results
+ # Create final user-friendly completion log
+ if successful_tasks == total_tasks:
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Workflow completed successfully: {successful_tasks}/{total_tasks} tasks completed",
+ "type": "success",
+ "status": "completed",
+ "progress": 100,
+ "agentName": "System"
+ })
+ elif successful_tasks > 0:
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Workflow completed partially: {successful_tasks}/{total_tasks} tasks completed",
+ "type": "warning",
+ "status": "completed",
+ "progress": 100,
+ "agentName": "System"
+ })
+ else:
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Workflow failed: {successful_tasks}/{total_tasks} tasks completed",
+ "type": "error",
+ "status": "failed",
+ "progress": 100,
+ "agentName": "System"
+ })
+
+ # Create serializable workflow results (with metadata only)
workflow_results_serializable = []
for result in workflow_results:
+ # Extract action results metadata
+ action_results_metadata = []
+ for action_result in result.get('action_results', []):
+ documents_metadata = []
+ for doc in action_result.get('documents', []):
+ if hasattr(doc, 'filename'):
+ documents_metadata.append({
+ 'filename': doc.filename,
+ 'fileSize': getattr(doc, 'fileSize', 0),
+ 'mimeType': getattr(doc, 'mimeType', 'unknown')
+ })
+ elif isinstance(doc, dict):
+ documents_metadata.append({
+ 'filename': doc.get('filename', 'unknown'),
+ 'fileSize': doc.get('fileSize', 0),
+ 'mimeType': doc.get('mimeType', 'unknown')
+ })
+
+ action_result_metadata = {
+ 'status': action_result.get('status', ''),
+ 'result_summary': action_result.get('result', '')[:200] + '...' if len(action_result.get('result', '')) > 200 else action_result.get('result', ''),
+ 'error': action_result.get('error', ''),
+ 'resultLabel': action_result.get('resultLabel', ''),
+ 'documents_count': len(documents_metadata),
+ 'documents_metadata': documents_metadata,
+ 'actionId': action_result.get('actionId', ''),
+ 'actionMethod': action_result.get('actionMethod', ''),
+ 'actionName': action_result.get('actionName', '')
+ }
+ action_results_metadata.append(action_result_metadata)
+
serializable_result = {
'task_step': result['task_step'],
- 'action_results': result['action_results'],
+ 'action_results': action_results_metadata,
'review_result': result['review_result'],
- 'handover_data': result['handover_data']
+ 'handover_data': {
+ 'task_step_id': result['handover_data'].get('task_step', {}).get('id', ''),
+ 'task_actions_count': len(result['handover_data'].get('task_actions', [])),
+ 'review_status': result['handover_data'].get('review_result', {}).get('status', ''),
+ 'next_task_ready': result['handover_data'].get('next_task_ready', False),
+ 'available_results_count': len(result['handover_data'].get('available_results', []))
+ },
+ 'retry_count': result.get('retry_count', 0),
+ 'task_success': result.get('task_success', False)
}
- # Convert task_actions to serializable format
+ # Convert task_actions to serializable format with metadata only
if 'task_actions' in result:
task_actions_serializable = []
for action in result['task_actions']:
+ # Extract only metadata from parameters
+ parameters_metadata = {}
+ if hasattr(action, 'execParameters') and action.execParameters:
+ for key, value in action.execParameters.items():
+ if key == 'documentList':
+ if isinstance(value, list):
+ parameters_metadata[key] = {
+ 'count': len(value),
+ 'labels': [str(v).split(':')[-1] if ':' in str(v) else str(v) for v in value]
+ }
+ else:
+ parameters_metadata[key] = str(value)
+ elif key == 'aiPrompt':
+ parameters_metadata[key] = str(value)[:100] + '...' if len(str(value)) > 100 else str(value)
+ else:
+ parameters_metadata[key] = str(value)
+
action_dict = {
'execMethod': action.execMethod,
'execAction': action.execAction,
- 'execParameters': action.execParameters,
+ 'execParameters': parameters_metadata,
'execResultLabel': action.execResultLabel,
'status': action.status.value if hasattr(action.status, 'value') else str(action.status)
}
@@ -1225,8 +1890,8 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
'status': 'completed' if successful_tasks == total_tasks else 'partial',
'successful_tasks': successful_tasks,
'total_tasks': total_tasks,
- 'workflow_results': workflow_results_serializable,
- 'final_results': previous_results
+ 'workflow_results_count': len(workflow_results_serializable),
+ 'final_results_count': len(previous_results)
}
logger.info(f"=== UNIFIED WORKFLOW COMPLETED: {successful_tasks}/{total_tasks} tasks successful ===")
@@ -1235,6 +1900,15 @@ NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
except Exception as e:
logger.error(f"Error in unified workflow execution: {str(e)}")
+ # Create error log for user
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Workflow execution failed: {str(e)}",
+ "type": "error",
+ "status": "failed",
+ "progress": 100,
+ "agentName": "System"
+ })
return {
'status': 'failed',
'error': str(e),
diff --git a/modules/workflow/managerDocument.py b/modules/workflow/managerDocument.py
index 4c182eaf..b1b8e709 100644
--- a/modules/workflow/managerDocument.py
+++ b/modules/workflow/managerDocument.py
@@ -17,8 +17,8 @@ class DocumentManager:
def __init__(self, serviceContainer):
self.service = serviceContainer
- # Create processor without any dependencies
- self._processor = DocumentProcessor()
+ # Create processor with service container for AI calls
+ self._processor = DocumentProcessor(serviceContainer)
async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> ExtractedContent:
"""Extract content from ChatDocument using prompt"""
diff --git a/modules/workflow/managerWorkflow.py b/modules/workflow/managerWorkflow.py
index 7291f6d9..ad06111e 100644
--- a/modules/workflow/managerWorkflow.py
+++ b/modules/workflow/managerWorkflow.py
@@ -52,8 +52,56 @@ class WorkflowManager:
except WorkflowStoppedException:
logger.info("Workflow stopped by user")
+ # Update workflow status to stopped
+ workflow.status = "stopped"
+ workflow.lastActivity = datetime.now(UTC).isoformat()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "stopped",
+ "lastActivity": workflow.lastActivity
+ })
+
+ # Add log entry
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": "Workflow stopped by user",
+ "type": "warning",
+ "status": "stopped",
+ "progress": 100
+ })
+
except Exception as e:
logger.error(f"Workflow processing error: {str(e)}")
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = datetime.now(UTC).isoformat()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity
+ })
+
+ # Create error message
+ error_message = {
+ "workflowId": workflow.id,
+ "role": "assistant",
+ "message": f"Workflow processing failed: {str(e)}",
+ "status": "last",
+ "sequenceNr": len(workflow.messages) + 1,
+ "publishedAt": datetime.now(UTC).isoformat()
+ }
+ message = self.chatInterface.createWorkflowMessage(error_message)
+ if message:
+ workflow.messages.append(message)
+
+ # Add error log entry
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": f"Workflow failed: {str(e)}",
+ "type": "error",
+ "status": "failed",
+ "progress": 100
+ })
+
raise
async def _sendFirstMessage(self, userInput: UserInputRequest, workflow: ChatWorkflow) -> ChatMessage:
@@ -108,6 +156,25 @@ class WorkflowManager:
if message:
workflow.messages.append(message)
+ # Update workflow status to completed
+ workflow.status = "completed"
+ workflow.lastActivity = datetime.now(UTC).isoformat()
+
+ # Update workflow in database
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "completed",
+ "lastActivity": workflow.lastActivity
+ })
+
+ # Add completion log entry
+ self.chatInterface.createWorkflowLog({
+ "workflowId": workflow.id,
+ "message": "Workflow completed successfully",
+ "type": "success",
+ "status": "completed",
+ "progress": 100
+ })
+
except Exception as e:
logger.error(f"Error sending last message: {str(e)}")
raise
@@ -128,6 +195,14 @@ class WorkflowManager:
message = self.chatInterface.createWorkflowMessage(error_message)
if message:
workflow.messages.append(message)
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = datetime.now(UTC).isoformat()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity
+ })
return
# Process successful workflow results
@@ -174,6 +249,14 @@ class WorkflowManager:
if message:
workflow.messages.append(message)
+ # Update workflow status to completed for successful workflows
+ workflow.status = "completed"
+ workflow.lastActivity = datetime.now(UTC).isoformat()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "completed",
+ "lastActivity": workflow.lastActivity
+ })
+
except Exception as e:
logger.error(f"Error processing workflow results: {str(e)}")
# Create error message
@@ -188,4 +271,12 @@ class WorkflowManager:
message = self.chatInterface.createWorkflowMessage(error_message)
if message:
workflow.messages.append(message)
+
+ # Update workflow status to failed
+ workflow.status = "failed"
+ workflow.lastActivity = datetime.now(UTC).isoformat()
+ self.chatInterface.updateWorkflow(workflow.id, {
+ "status": "failed",
+ "lastActivity": workflow.lastActivity
+ })
diff --git a/modules/workflow/processorDocument.py b/modules/workflow/processorDocument.py
index 1ea5084c..75929f86 100644
--- a/modules/workflow/processorDocument.py
+++ b/modules/workflow/processorDocument.py
@@ -32,9 +32,10 @@ class FileProcessingError(Exception):
class DocumentProcessor:
"""Processor for handling document operations and content extraction."""
- def __init__(self):
+ def __init__(self, serviceContainer=None):
"""Initialize the document processor."""
self._neutralizer = DataAnonymizer() if APP_CONFIG.get("ENABLE_CONTENT_NEUTRALIZATION", False) else None
+ self._serviceContainer = serviceContainer
self.supportedTypes: Dict[str, Callable[[bytes, str, str], Awaitable[List[ContentItem]]]] = {
'text/plain': self._processText,
@@ -108,7 +109,9 @@ class DocumentProcessor:
logger.info("Image processing libraries successfully loaded")
except ImportError as e:
logger.warning(f"Image processing libraries could not be loaded: {e}")
-
+
+
+
async def processFileData(self, fileData: bytes, filename: str, mimeType: str, base64Encoded: bool = False, prompt: str = None, documentId: str = None) -> ExtractedContent:
"""
Process file data directly and extract its contents with AI processing.
@@ -133,7 +136,7 @@ class DocumentProcessor:
# Detect content type if needed
if mimeType == "application/octet-stream":
- mimeType = self._detectContentTypeFromData(fileData, filename)
+ mimeType = self._serviceContainer.detectContentTypeFromData(fileData, filename)
# Process document based on type
if mimeType not in self.supportedTypes:
@@ -161,61 +164,8 @@ class DocumentProcessor:
except Exception as e:
logger.error(f"Error processing file data: {str(e)}")
raise FileProcessingError(f"Failed to process file data: {str(e)}")
-
- def _detectContentTypeFromData(self, fileData: bytes, filename: str) -> str:
- """Detect content type from file data and filename"""
- try:
- # Check file extension first
- ext = os.path.splitext(filename)[1].lower()
- if ext:
- # Map common extensions to MIME types
- extToMime = {
- '.txt': 'text/plain',
- '.md': 'text/markdown',
- '.csv': 'text/csv',
- '.json': 'application/json',
- '.xml': 'application/xml',
- '.js': 'application/javascript',
- '.py': 'application/x-python',
- '.svg': 'image/svg+xml',
- '.jpg': 'image/jpeg',
- '.png': 'image/png',
- '.gif': 'image/gif',
- '.pdf': 'application/pdf',
- '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
- '.doc': 'application/msword',
- '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
- '.xls': 'application/vnd.ms-excel',
- '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation',
- '.ppt': 'application/vnd.ms-powerpoint'
- }
- if ext in extToMime:
- return extToMime[ext]
-
- # Try to detect from content
- if fileData.startswith(b'%PDF'):
- return 'application/pdf'
- elif fileData.startswith(b'PK\x03\x04'):
- # ZIP-based formats (docx, xlsx, pptx)
- return 'application/zip'
- elif fileData.startswith(b'<'):
- # XML-based formats
- try:
- text = fileData.decode('utf-8', errors='ignore')
- if '