diff --git a/GOOGLE_OAUTH_SETUP.md b/GOOGLE_OAUTH_SETUP.md new file mode 100644 index 00000000..85d4500f --- /dev/null +++ b/GOOGLE_OAUTH_SETUP.md @@ -0,0 +1,114 @@ +# Google OAuth 2.0 Setup Guide for PowerOn + +## Overview +This guide explains how to set up Google OAuth 2.0 authentication for the PowerOn application. + +## Prerequisites +- A Google account +- Access to Google Cloud Console (https://console.cloud.google.com/) + +## Step 1: Create a Google Cloud Project + +1. Go to [Google Cloud Console](https://console.cloud.google.com/) +2. Click on the project dropdown at the top of the page +3. Click "New Project" +4. Enter a project name (e.g., "PowerOn OAuth") +5. Click "Create" + +## Step 2: Enable Google+ API + +1. In your new project, go to "APIs & Services" > "Library" +2. Search for "Google+ API" or "Google Identity" +3. Click on "Google+ API" and click "Enable" + +## Step 3: Create OAuth 2.0 Credentials + +1. Go to "APIs & Services" > "Credentials" +2. Click "Create Credentials" > "OAuth client ID" +3. If prompted, configure the OAuth consent screen first: + - Choose "External" user type + - Fill in the required fields (App name, User support email, Developer contact information) + - Add scopes: `https://www.googleapis.com/auth/userinfo.profile`, `https://www.googleapis.com/auth/userinfo.email` + - Add test users if needed + - Click "Save and Continue" through all sections + +4. Back to creating OAuth client ID: + - Application type: "Web application" + - Name: "PowerOn Web Client" + - Authorized redirect URIs: Add your redirect URI + - For development: `http://localhost:8000/api/google/auth/callback` + - For production: `https://yourdomain.com/api/google/auth/callback` + +5. Click "Create" +6. **Important**: Copy the Client ID and Client Secret - you'll need these for the next step + +## Step 4: Configure PowerOn Application + +1. Open your environment file (`gateway/env_dev.env` for development) +2. Replace the placeholder values with your actual Google OAuth credentials: + +```env +# Google OAuth Configuration +Service_GOOGLE_CLIENT_ID = your-actual-client-id-from-google-console +Service_GOOGLE_CLIENT_SECRET = your-actual-client-secret-from-google-console +Service_GOOGLE_REDIRECT_URI = http://localhost:8000/api/google/auth/callback +``` + +3. Save the file +4. Restart your PowerOn gateway server + +## Step 5: Test the Configuration + +1. Start your PowerOn application +2. Go to the Connections module +3. Click "Connect Google" +4. You should be redirected to Google's OAuth consent screen +5. After authorization, you should be redirected back to PowerOn + +## Troubleshooting + +### Common Issues + +#### 1. "Missing required parameter: redirect_uri" +- **Cause**: Google OAuth client is not properly configured with the redirect URI +- **Solution**: Ensure the redirect URI in Google Cloud Console exactly matches your application's callback URL + +#### 2. "Invalid client" error +- **Cause**: Client ID or Client Secret is incorrect +- **Solution**: Double-check the credentials in your environment file + +#### 3. "Redirect URI mismatch" error +- **Cause**: The redirect URI in your OAuth request doesn't match what's configured in Google Cloud Console +- **Solution**: Ensure both URIs are identical (including protocol, domain, port, and path) + +### Debug Steps + +1. Check the PowerOn gateway logs for OAuth configuration details +2. Verify environment variables are loaded correctly +3. Ensure the Google OAuth client is configured for "Web application" type +4. Check that the redirect URI includes the full path: `/api/google/auth/callback` + +## Security Notes + +- **Never commit** your Google OAuth credentials to version control +- Use environment variables or secure configuration management +- Regularly rotate your client secrets +- Monitor OAuth usage in Google Cloud Console + +## Production Considerations + +For production deployment: + +1. Use HTTPS for all OAuth redirects +2. Configure proper domain verification in Google Cloud Console +3. Set up monitoring and alerting for OAuth usage +4. Consider implementing additional security measures like PKCE (Proof Key for Code Exchange) + +## Support + +If you continue to experience issues: + +1. Check the PowerOn gateway logs for detailed error messages +2. Verify your Google OAuth configuration in Google Cloud Console +3. Test with a simple OAuth flow to isolate the issue +4. Ensure your Google Cloud project has billing enabled (required for some APIs) diff --git a/app.py b/app.py index 81b1c9af..bfe82f8f 100644 --- a/app.py +++ b/app.py @@ -53,7 +53,8 @@ def initLogging(): 'response_closed.started', '_send_single_request', 'httpcore.http11', - 'httpx._client' + 'httpx._client', + 'HTTP Request' ] return not any(pattern in record.msg for pattern in http_debug_patterns) return True diff --git a/config.ini b/config.ini index 799d31ba..a5284bde 100644 --- a/config.ini +++ b/config.ini @@ -55,6 +55,5 @@ Service_MSFT_CLIENT_SECRET = Kxf8Q~2lJIteZ~JaI32kMf1lfaWKATqxXiNiFbzV Service_MSFT_TENANT_ID = common # Google Service configuration -Service_GOOGLE_CLIENT_ID = your-google-client-id -Service_GOOGLE_CLIENT_SECRET = your-google-client-secret -Service_GOOGLE_REDIRECT_URI = http://localhost:8000/api/google/auth/callback +Service_GOOGLE_CLIENT_ID = 354925410565-aqs2b2qaiqmm73qpjnel6al8eid78uvg.apps.googleusercontent.com +Service_GOOGLE_CLIENT_SECRET = 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 diff --git a/env_dev.env b/env_dev.env deleted file mode 100644 index 8e5f8572..00000000 --- a/env_dev.env +++ /dev/null @@ -1,45 +0,0 @@ -# Development Environment Configuration - -# System Configuration -APP_ENV_TYPE = dev -APP_ENV_LABEL = Development Instance Patrick -APP_API_URL = http://localhost:8000 - -# Database Configuration for Application -DB_APP_HOST=D:/Temp/_powerondb -DB_APP_DATABASE=app -DB_APP_USER=dev_user -DB_APP_PASSWORD_SECRET=dev_password - -# Database Configuration Chat -DB_CHAT_HOST=D:/Temp/_powerondb -DB_CHAT_DATABASE=chat -DB_CHAT_USER=dev_user -DB_CHAT_PASSWORD_SECRET=dev_password - -# Database Configuration Management -DB_MANAGEMENT_HOST=D:/Temp/_powerondb -DB_MANAGEMENT_DATABASE=management -DB_MANAGEMENT_USER=dev_user -DB_MANAGEMENT_PASSWORD_SECRET=dev_password - -# Security Configuration -APP_JWT_SECRET_SECRET=dev_jwt_secret_token -APP_TOKEN_EXPIRY=300 - -# CORS Configuration -APP_ALLOWED_ORIGINS=http://localhost:8080,https://playground.poweron-center.net,http://localhost:5176,https://nyla.poweron-center.net - -# Logging configuration -APP_LOGGING_LOG_LEVEL = DEBUG -APP_LOGGING_LOG_FILE = poweron.log -APP_LOGGING_FORMAT = %(asctime)s - %(levelname)s - %(name)s - %(message)s -APP_LOGGING_DATE_FORMAT = %Y-%m-%d %H:%M:%S -APP_LOGGING_CONSOLE_ENABLED = True -APP_LOGGING_FILE_ENABLED = True -APP_LOGGING_ROTATION_SIZE = 10485760 -APP_LOGGING_BACKUP_COUNT = 5 - -# Service Redirects -Service_MSFT_REDIRECT_URI = http://localhost:8000/api/msft/auth/callback -Service_GOOGLE_REDIRECT_URI = http://localhost:8000/api/google/auth/callback \ No newline at end of file diff --git a/env_prod.env b/env_prod.env index ed6dfc52..8415cb8c 100644 --- a/env_prod.env +++ b/env_prod.env @@ -42,4 +42,4 @@ APP_LOGGING_BACKUP_COUNT = 5 # Service Redirects Service_MSFT_REDIRECT_URI = https://gateway.poweron-center.net/api/msft/auth/callback -Service_GOOGLE_REDIRECT_URI = http://gateway.poweron-center.net/api/google/auth/callback +Service_GOOGLE_REDIRECT_URI = https://gateway.poweron-center.net/api/google/auth/callback diff --git a/modules/chat/BACKUP managerChat.py b/modules/chat/BACKUP managerChat.py deleted file mode 100644 index 056efcbc..00000000 --- a/modules/chat/BACKUP managerChat.py +++ /dev/null @@ -1,3078 +0,0 @@ -import asyncio -import logging -import uuid -import json -import time -from typing import Dict, Any, Optional, List, Union -from datetime import datetime, UTC - -from modules.interfaces.interfaceAppModel import User -from modules.interfaces.interfaceChatModel import ( - TaskStatus, ChatDocument, TaskItem, TaskAction, TaskResult, ChatStat, ChatLog, ChatMessage, ChatWorkflow, UserInputRequest, ActionResult, - ExtractedContent, ContentItem, ContentMetadata, DocumentExchange, TaskStep, TaskContext, ActionExecutionResult, ReviewContext, ReviewResult, TaskPlan, WorkflowResult -) -from modules.chat.serviceCenter import ServiceCenter -from modules.interfaces.interfaceChatObjects import ChatObjects - -logger = logging.getLogger(__name__) - -# ===== STATE MANAGEMENT AND VALIDATION CLASSES ===== - -class TaskExecutionState: - """Manages state during task execution with retry logic""" - def __init__(self, task_step: TaskStep): - self.task_step = task_step - self.successful_actions: List[ActionExecutionResult] = [] # Preserved across retries - self.failed_actions: List[ActionExecutionResult] = [] # For analysis - self.current_action_index = 0 - self.retry_count = 0 - self.improvements = [] - self.partial_results = {} # Store intermediate results - self.max_retries = 3 - def addSuccessfulAction(self, action_result: ActionExecutionResult): - self.successful_actions.append(action_result) - if action_result.data.get('resultLabel'): - self.partial_results[action_result.data['resultLabel']] = action_result - def addFailedAction(self, action_result: ActionExecutionResult): - self.failed_actions.append(action_result) - def getAvailableResults(self) -> list: - return [result.data.get('resultLabel', '') for result in self.successful_actions if result.data.get('resultLabel')] - def shouldRetryTask(self) -> bool: - return len(self.successful_actions) > 0 and len(self.failed_actions) > 0 - def canRetry(self) -> bool: - return self.retry_count < self.max_retries - def incrementRetryCount(self): - self.retry_count += 1 - def getFailurePatterns(self) -> list: - patterns = [] - for action in self.failed_actions: - error = action.error.lower() if action.error else '' - if "timeout" in error: - patterns.append("timeout_issues") - elif "document_not_found" in error or "file not found" in error: - patterns.append("document_reference_issues") - elif "empty_result" in error or "no content" in error: - patterns.append("content_extraction_issues") - elif "invalid_format" in error or "wrong format" in error: - patterns.append("format_issues") - elif "permission" in error or "access denied" in error: - patterns.append("permission_issues") - return list(set(patterns)) - -class ActionValidator: - """Generic AI-based action result validation""" - def __init__(self, chat_manager): - self.chat_manager = chat_manager - - async def validateActionResult(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> dict: - """Generic action validation using AI""" - try: - # Create generic validation prompt - prompt = self._createGenericValidationPrompt(action_result, action, context) - response = await self.chat_manager._callAIWithCircuitBreaker(prompt, "action_validation") - validation = self._parseValidationResponse(response) - - # Add action metadata - validation['action_id'] = action.id - validation['action_method'] = action.execMethod - validation['action_name'] = action.execAction - validation['result_label'] = action.execResultLabel - - return validation - except Exception as e: - logger.error(f"Error validating action result: {str(e)}") - return { - 'status': 'success', - 'reason': f'Validation failed: {str(e)}', - 'confidence': 0.5, - 'improvements': [], - 'action_id': action.id, - 'action_method': action.execMethod, - 'action_name': action.execAction, - 'result_label': action.execResultLabel - } - - def _createGenericValidationPrompt(self, action_result: ActionResult, action: TaskAction, context: TaskContext) -> str: - """Create a validation prompt focused on result file delivery""" - # Extract data from ActionResult model - success = action_result.success - result_data = action_result.data - error = action_result.error - validation_messages = action_result.validation - - # Extract result text from data - result_text = result_data.get("result", "") if isinstance(result_data, dict) else str(result_data) - - # Get documents from ActionResult data - documents = result_data.get("documents", []) if isinstance(result_data, dict) else [] - doc_count = len(documents) - - # Extract expected result format from action parameters - expected_result_label = action.execResultLabel - expected_format = action.execParameters.get('outputFormat', 'unknown') - - # Extract expected document formats from action - expected_document_formats = action.expectedDocumentFormats or [] - - # Check if the result label is present in the action result data - actual_result_label = result_data.get("resultLabel", "") if isinstance(result_data, dict) else "" - result_label_match = actual_result_label == expected_result_label - - # Analyze delivered documents and content - delivered_files = [] - delivered_formats = [] - content_items = [] - - # Check for ChatDocument objects - for doc in documents: - if hasattr(doc, 'filename'): - delivered_files.append(doc.filename) - # Extract format information - file_extension = self._getFileExtension(doc.filename) - mime_type = getattr(doc, 'mimeType', 'application/octet-stream') - delivered_formats.append({ - 'filename': doc.filename, - 'extension': file_extension, - 'mimeType': mime_type - }) - elif isinstance(doc, dict) and 'filename' in doc: - delivered_files.append(doc['filename']) - file_extension = self._getFileExtension(doc['filename']) - mime_type = doc.get('mimeType', 'application/octet-stream') - delivered_formats.append({ - 'filename': doc['filename'], - 'extension': file_extension, - 'mimeType': mime_type - }) - else: - delivered_files.append(f"document_{len(delivered_files)}") - delivered_formats.append({ - 'filename': f"document_{len(delivered_files)}", - 'extension': 'unknown', - 'mimeType': 'application/octet-stream' - }) - - # Check for ExtractedContent in result data - if isinstance(result_data, dict): - if 'extractedContent' in result_data: - extracted_content = result_data['extractedContent'] - if hasattr(extracted_content, 'contents'): - content_items = extracted_content.contents - elif 'contents' in result_data: - content_items = result_data['contents'] - - # If we have delivered files but no content items, consider it successful - # This handles the case where content is stored in files rather than result data - if delivered_files and not content_items: - content_items = [f"File content available in: {', '.join(delivered_files)}"] - - # Analyze content items - content_summary = [] - for item in content_items: - if hasattr(item, 'label') and hasattr(item, 'metadata'): - content_summary.append(f"{item.label}: {item.metadata.mimeType if hasattr(item.metadata, 'mimeType') else 'unknown'}") - elif isinstance(item, str): - content_summary.append(item) - else: - content_summary.append(str(item)) - - return f"""You are an action result validator. Your primary focus is to validate that the action delivered the promised result files in the promised format. - -ACTION DETAILS: -- Method: {action.execMethod} -- Action: {action.execAction} -- Expected Result Label: {expected_result_label} -- Actual Result Label: {actual_result_label} -- Result Label Match: {result_label_match} -- Expected Format: {expected_format} -- Expected Document Formats: {json.dumps(expected_document_formats, indent=2) if expected_document_formats else 'None specified'} -- Parameters: {json.dumps(action.execParameters, indent=2)} - -RESULT TO VALIDATE: -- Success: {success} -- Result Data: {result_text[:500]}{'...' if len(result_text) > 500 else ''} -- Error: {error} -- Validation Messages: {', '.join(validation_messages) if validation_messages else 'None'} -- Documents Produced: {doc_count} -- Delivered Files: {', '.join(delivered_files) if delivered_files else 'None'} -- Delivered Formats: {json.dumps(delivered_formats, indent=2) if delivered_formats else 'None'} -- Content Items: {', '.join(content_summary) if content_summary else 'None'} - -CRITICAL VALIDATION CRITERIA: -1. **Result Label Match**: Does the action result contain the expected result label? -2. **File Delivery**: Did the action deliver the promised result file(s)? -3. **Format Compliance**: If expected document formats were specified, do the delivered files match the expected formats? -4. **Content Quality**: Is the content of the delivered files usable and complete? -5. **Content Processing**: If content extraction was expected, was it performed correctly? - -CONTEXT: -- Task Description: {context.task_step.description if context.task_step else 'Unknown'} -- Previous Results: {', '.join(context.previous_results) if context.previous_results else 'None'} - -VALIDATION INSTRUCTIONS: -1. **Result Label Check**: Verify that the expected result label "{expected_result_label}" is present in the action result data. This is the primary success criterion. -2. **File Delivery**: Check if files were delivered when expected. The individual filenames don't need to match the result label - focus on whether content was actually produced. -3. **Format Compliance**: If expected document formats were specified, check if delivered files match the expected extensions and MIME types. If no formats were specified, this criterion is satisfied. -4. **Content Quality**: If files were delivered, consider the action successful. The presence of delivered files indicates content was processed and stored. -5. **Content Processing**: If files were delivered, assume content extraction was performed correctly. The file delivery is evidence of successful processing. -6. **Success Criteria**: The action is successful if the result label matches AND files were delivered. If expected formats were specified, they should also match. - -IMPORTANT NOTES: -- The result label must be present in the action result data for success -- Individual filenames can be different from the result label -- If files were delivered, consider the action successful even if content details are not provided -- Focus on whether the action accomplished its intended purpose (file delivery) -- Empty files should be considered failures, but delivered files indicate success - -REQUIRED JSON RESPONSE: -{{ - "status": "success|retry|fail", - "reason": "Detailed explanation focusing on result label match and content quality", - "confidence": 0.0-1.0, - "improvements": ["specific improvements if needed"], - "quality_score": 1-10, - "missing_elements": ["missing result label", "missing files", "content issues"], - "suggested_retry_approach": "Specific approach for retry if status is retry" -}} - -NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" - - def _parseValidationResponse(self, response: str) -> dict: - """Parse the AI validation response""" - try: - json_start = response.find('{') - json_end = response.rfind('}') + 1 - if json_start == -1 or json_end == 0: - raise ValueError("No JSON found in validation response") - - json_str = response[json_start:json_end] - validation = json.loads(json_str) - - if 'status' not in validation: - raise ValueError("Validation response missing 'status' field") - - # Set defaults for optional fields - validation.setdefault('confidence', 0.5) - validation.setdefault('improvements', []) - validation.setdefault('quality_score', 5) - validation.setdefault('missing_elements', []) - validation.setdefault('suggested_retry_approach', '') - - return validation - except Exception as e: - logger.error(f"Error parsing validation response: {str(e)}") - return { - 'status': 'success', - 'reason': f'Parse error: {str(e)}', - 'confidence': 0.5, - 'improvements': [], - 'quality_score': 5, - 'missing_elements': [], - 'suggested_retry_approach': '' - } - - def _getFileExtension(self, filename: str) -> str: - """Extract file extension from filename""" - if '.' in filename: - return '.' + filename.split('.')[-1] - return '' - -class ChatManager: - """Chat manager with improved AI integration and method handling""" - - def __init__(self, currentUser: User, chatInterface: ChatObjects): - self.currentUser = currentUser - self.chatInterface = chatInterface - self.service: ServiceCenter = None - self.workflow: ChatWorkflow = None - - # Circuit breaker for AI calls - self.ai_failure_count = 0 - self.ai_last_failure_time = None - self.ai_circuit_breaker_threshold = 5 - self.ai_circuit_breaker_timeout = 300 # 5 minutes - - # Timeout settings - self.ai_call_timeout = 120 # 2 minutes - self.task_execution_timeout = 600 # 10 minutes - - # ===== Initialization and Setup ===== - async def initialize(self, workflow: ChatWorkflow) -> None: - """Initialize chat manager with workflow""" - self.workflow = workflow - self.service = ServiceCenter(self.currentUser, self.workflow) - - # ===== WORKFLOW PHASES ===== - - # Phase 1: High-Level Task Planning - async def planHighLevelTasks(self, userInput: str, workflow: ChatWorkflow) -> TaskPlan: - """Phase 1: Plan high-level tasks from user input""" - try: - logger.info(f"Planning high-level tasks for workflow {workflow.id}") - - # Create planning prompt - prompt = self.createTaskPlanningPrompt({ - 'user_request': userInput, - 'available_documents': self._getAvailableDocuments(workflow), - 'workflow_id': workflow.id - }) - - # Get AI response with fallback mechanism - response = await self._callAIWithCircuitBreaker(prompt, "task_planning") - - # Parse and validate task plan - task_plan_dict = self._parseTaskPlanResponse(response) - - if not self._validateTaskPlan(task_plan_dict): - logger.error("Generated task plan failed validation") - raise Exception("AI-generated task plan failed validation - AI is required for task planning") - - # Convert to TaskPlan model - tasks = [] - for task_dict in task_plan_dict.get('tasks', []): - task = TaskStep( - id=task_dict.get('id', ''), - description=task_dict.get('description', ''), - dependencies=task_dict.get('dependencies', []), - expected_outputs=task_dict.get('expected_outputs', []), - success_criteria=task_dict.get('success_criteria', []), - required_documents=task_dict.get('required_documents', []), - estimated_complexity=task_dict.get('estimated_complexity'), - ai_prompt=task_dict.get('ai_prompt') - ) - tasks.append(task) - - task_plan = TaskPlan( - overview=task_plan_dict.get('overview', ''), - tasks=tasks - ) - - # Log the task plan as JSON for debugging - logger.info(f"Task plan created for workflow {workflow.id}:") - task_plan_json = { - 'overview': task_plan.overview, - 'tasks_count': len(task_plan.tasks), - 'tasks': [] - } - for task in task_plan.tasks: - task_json = { - 'id': task.id, - 'description': task.description, - 'dependencies': task.dependencies or [], - 'expected_outputs': task.expected_outputs or [], - 'success_criteria': task.success_criteria or [], - 'required_documents': task.required_documents or [], - 'estimated_complexity': task.estimated_complexity or '', - 'ai_prompt': task.ai_prompt or '' - } - task_plan_json['tasks'].append(task_json) - logger.info(f"Task Plan: {json.dumps(task_plan_json, indent=2, ensure_ascii=False)}") - - logger.info(f"High-level task planning completed: {len(task_plan.tasks)} tasks") - return task_plan - - except Exception as e: - error_message = str(e) - logger.error(f"Error in high-level task planning: {error_message}") - - # Provide more specific error messages based on the error type - if "overloaded" in error_message.lower() or "529" in error_message: - detailed_error = "AI service is currently overloaded. Please try again in a few minutes." - elif "rate limit" in error_message.lower() or "429" in error_message: - detailed_error = "Rate limit exceeded. Please wait before making another request." - elif "api key" in error_message.lower() or "401" in error_message: - detailed_error = "Invalid API key. Please check your AI service configuration." - elif "timeout" in error_message.lower(): - detailed_error = "AI service request timed out. Please try again." - else: - detailed_error = f"AI service error: {error_message}" - - raise Exception(detailed_error) - - # Phase 2: Task Definition and Action Generation - async def defineTaskActions(self, task_step: TaskStep, workflow: ChatWorkflow, previous_results: List[str] = None, - enhanced_context: TaskContext = None) -> List[TaskAction]: - """Phase 2: Define specific actions for a task step with enhanced retry context""" - try: - logger.info(f"Defining actions for task: {task_step.description if hasattr(task_step, 'description') else 'Unknown'}") - - # Use enhanced context if provided (for retries), otherwise create basic context - if enhanced_context: - context = enhanced_context - else: - context = TaskContext( - task_step=task_step, - workflow=workflow, - workflow_id=workflow.id, - available_documents=self._getAvailableDocuments(workflow), - previous_results=previous_results or [], - improvements=[], - retry_count=0, - previous_action_results=[], - previous_review_result=None, - is_regeneration=False, - failure_patterns=[], - failed_actions=[], - successful_actions=[] - ) - - # Generate actions using AI - actions = await self._generateActionsForTaskStep(context) - - # Log the generated actions as JSON for debugging - logger.info(f"Generated {len(actions)} actions for task '{task_step.description}':") - for i, action in enumerate(actions): - logger.info(f"Action {i+1}: {json.dumps(action, indent=2, ensure_ascii=False)}") - - # Convert to TaskAction objects - # Get available document labels for validation - available_document_labels = set(self._getAvailableDocuments(workflow)) - task_actions = [] - invalid_doc_ref_detected = False - # Collect resultLabels of actions defined so far in this step - result_labels_so_far = set() - for action_dict in actions: - # Validate document references in parameters - params = action_dict.get('parameters', {}) - if 'documentList' in params and isinstance(params['documentList'], list): - original_refs = params['documentList'] - # Allow references to available documents or to resultLabels of actions defined so far - valid_refs = [ref for ref in original_refs if ref in available_document_labels or ref in result_labels_so_far] - if len(valid_refs) < len(original_refs): - logger.warning(f"Action {action_dict.get('method','?')}.{action_dict.get('action','?')} has invalid document references: {set(original_refs) - set(valid_refs)}. Only using valid: {valid_refs}") - invalid_doc_ref_detected = True - if not valid_refs: - logger.warning(f"Skipping action {action_dict.get('method','?')}.{action_dict.get('action','?')} due to no valid document references.") - continue - params['documentList'] = valid_refs - action_data = { - "execMethod": action_dict.get('method', 'unknown'), - "execAction": action_dict.get('action', 'unknown'), - "execParameters": params, - "execResultLabel": action_dict.get('resultLabel', ''), - "expectedDocumentFormats": action_dict.get('expectedDocumentFormats', None), - "status": TaskStatus.PENDING - } - task_action = self.chatInterface.createTaskAction(action_data) - if task_action: - # Log action definition: parameters, input documentLabels, output document label - logger.debug(f"[ACTION DEFINITION] Method: {task_action.execMethod}, Action: {task_action.execAction}, Parameters: {json.dumps(task_action.execParameters, ensure_ascii=False)}, Input documentLabels: {task_action.execParameters.get('documentList', [])}, Output documentLabel: {task_action.execResultLabel}") - task_actions.append(task_action) - # Add this action's resultLabel to the running set for subsequent actions - if action_data["execResultLabel"]: - result_labels_so_far.add(action_data["execResultLabel"]) - logger.info(f"Created task action: {task_action.execMethod}.{task_action.execAction}") - # If all actions were skipped due to invalid document references, add improvement and return [] - if not task_actions and invalid_doc_ref_detected: - improvement_msg = ("Previous action(s) referenced invalid or unavailable document labels. " - "Only use document labels listed in AVAILABLE DOCUMENTS. Do not invent or copy message IDs.") - if enhanced_context: - if hasattr(enhanced_context, 'improvements') and isinstance(enhanced_context.improvements, list): - enhanced_context.improvements.append(improvement_msg) - else: - if hasattr(context, 'improvements') and isinstance(context.improvements, list): - context.improvements.append(improvement_msg) - logger.warning("All actions skipped due to invalid document references. Added improvement for retry.") - return [] - - # Update stats for task validation (estimate bytes for action validation) - if task_actions: - # Calculate actual action size for stats - action_size = self.service.calculateObjectSize(task_actions) - self.service.updateWorkflowStats(eventLabel="action", bytesSent=action_size) - - # Log the final TaskAction objects as JSON - logger.info(f"Final TaskAction objects for task '{task_step.description}':") - for i, task_action in enumerate(task_actions): - action_json = { - 'id': task_action.id, - 'execMethod': task_action.execMethod, - 'execAction': task_action.execAction, - 'execParameters': task_action.execParameters, - 'execResultLabel': task_action.execResultLabel, - 'status': task_action.status.value if hasattr(task_action.status, 'value') else str(task_action.status) - } - logger.info(f"TaskAction {i+1}: {json.dumps(action_json, indent=2, ensure_ascii=False)}") - - logger.info(f"Task action definition completed: {len(task_actions)} actions") - return task_actions - - except Exception as e: - logger.error(f"Error defining task actions: {str(e)}") - return [] - - # Phase 3: Action Execution - async def executeTaskActions(self, task_actions: List[TaskAction], workflow: ChatWorkflow) -> List[ActionExecutionResult]: - """Phase 3: Execute all actions for a task with retry mechanism""" - try: - logger.info(f"Executing {len(task_actions)} task actions") - - results = [] - for i, action in enumerate(task_actions): - logger.info(f"Executing action {i+1}/{len(task_actions)}: {action.execMethod}.{action.execAction}") - - # Execute single action with retry mechanism - result = await self._executeSingleAction(action, workflow) - results.append(result) - - # If action failed after all retries, continue with next action instead of stopping - if not result.success: - logger.error(f"Action {i+1} failed after retries, continuing with next action") - # Don't break - continue with remaining actions - continue - - logger.info(f"Task action execution completed: {len(results)} results") - return results - - except Exception as e: - logger.error(f"Error executing task actions: {str(e)}") - return [] - - # Phase 4: Task Review and Quality Assessment - async def reviewTaskCompletion(self, task_step: TaskStep, task_actions: List[TaskAction], - action_results: List[ActionExecutionResult], workflow: ChatWorkflow) -> ReviewResult: - """Phase 4: Review task completion and decide next steps""" - try: - logger.info(f"Reviewing task completion: {task_step.description}") - - # Create step result summary from action results - step_result = { - 'task_step': task_step, - 'action_results': action_results, - 'successful_actions': sum(1 for result in action_results if result.success), - 'total_actions': len(action_results), - 'results': [result.data.get('result', '') for result in action_results if result.success], - 'errors': [result.error for result in action_results if not result.success] - } - - # Prepare review context - review_context = ReviewContext( - task_step=task_step, - task_actions=task_actions, - action_results=action_results, - step_result=step_result, - workflow_id=workflow.id, - previous_results=self._getPreviousResultsFromActions(task_actions) - ) - - # Use AI to review the results - review = await self._performTaskReview(review_context) - - # Add quality metrics - quality_metrics = self._calculateTaskQualityMetrics(task_step, action_results) - - logger.info(f"Task review completed: {review.status}") - return ReviewResult( - status=review.status, - reason=review.reason, - improvements=review.improvements, - quality_score=review.quality_score, - missing_outputs=review.missing_outputs, - met_criteria=review.met_criteria, - unmet_criteria=review.unmet_criteria, - confidence=review.confidence - ) - - except Exception as e: - logger.error(f"Error reviewing task completion: {str(e)}") - return ReviewResult( - status='failed', - reason=f'Review failed: {str(e)}', - quality_score=0, - confidence=0 - ) - - # Phase 5: Task Handover and State Management - async def prepareTaskHandover(self, task_step: TaskStep, task_actions: List[TaskAction], - review_result: ReviewResult, workflow: ChatWorkflow) -> Dict[str, Any]: - """Phase 5: Prepare results for next task or workflow completion""" - try: - logger.info(f"Preparing task handover: {task_step.description}") - - # Update task actions with results - for action in task_actions: - if action.status == TaskStatus.PENDING: - action.status = TaskStatus.COMPLETED if review_result.status == 'success' else TaskStatus.FAILED - - # Create serializable task actions - task_actions_serializable = [] - for action in task_actions: - action_dict = { - 'id': action.id, - 'execMethod': action.execMethod, - 'execAction': action.execAction, - 'execParameters': action.execParameters, - 'execResultLabel': action.execResultLabel, - 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) - } - task_actions_serializable.append(action_dict) - - # Create handover data - handover_data = { - 'task_step': task_step, - 'task_actions': task_actions_serializable, - 'review_result': review_result, - 'next_task_ready': review_result.status == 'success', - 'available_results': self._getPreviousResultsFromActions(task_actions) - } - - logger.info(f"Task handover prepared: next_task_ready={handover_data['next_task_ready']}") - return handover_data - - except Exception as e: - logger.error(f"Error preparing task handover: {str(e)}") - # Create serializable task actions for exception case - task_actions_serializable = [] - for action in task_actions: - action_dict = { - 'id': action.id, - 'execMethod': action.execMethod, - 'execAction': action.execAction, - 'execParameters': action.execParameters, - 'execResultLabel': action.execResultLabel, - 'status': action.status.value if hasattr(action.status, 'value') else str(action.status) - } - task_actions_serializable.append(action_dict) - - return { - 'task_step': task_step, - 'task_actions': task_actions_serializable, - 'review_result': review_result, - 'next_task_ready': False, - 'available_results': [] - } - - - - - - # ===== Utility Methods ===== - - async def processFileIds(self, fileIds: List[str]) -> List[ChatDocument]: - """Process file IDs and return ChatDocument objects""" - documents = [] - - for fileId in fileIds: - try: - # Ensure service is initialized - if not hasattr(self, 'service') or not self.service: - logger.error(f"Service not initialized for file ID {fileId}") - continue - - # Get file info from service - fileInfo = self.service.getFileInfo(fileId) - if fileInfo: - # Create document using interface - documentData = { - "fileId": fileId, - "filename": fileInfo.get("filename", "unknown"), - "fileSize": fileInfo.get("size", 0), - "mimeType": fileInfo.get("mimeType", "application/octet-stream") - } - document = self.chatInterface.createChatDocument(documentData) - if document: - documents.append(document) - logger.info(f"Processed file ID {fileId} -> {document.filename}") - else: - logger.warning(f"No file info found for file ID {fileId}") - except Exception as e: - logger.error(f"Error processing file ID {fileId}: {str(e)}") - - - return documents - - def setUserLanguage(self, language: str) -> None: - """Set user language for the chat manager""" - if hasattr(self, 'service') and self.service: - self.service.user.language = language - - # ===== Enhanced Task Planning Methods ===== - - async def _callAIWithCircuitBreaker(self, prompt: str, context: str) -> str: - """Call AI with intelligent routing based on complexity and circuit breaker pattern""" - max_retries = 3 - base_delay = 2 # Start with 2 seconds - - for attempt in range(max_retries): - try: - # Check circuit breaker - if self._isCircuitBreakerOpen(): - raise Exception("AI circuit breaker is open - too many recent failures") - - # Determine which AI service to use based on complexity - ai_choice = self._determineAIChoice(prompt, context) - logger.debug(f"AI choice for {context}: {ai_choice} (attempt {attempt + 1}/{max_retries})") - - if ai_choice == "advanced": - # Use advanced AI for complex tasks - try: - response = await asyncio.wait_for( - self._callAdvancedAI(prompt, context), - timeout=self.ai_call_timeout - ) - - # Reset failure count on success - self.ai_failure_count = 0 - logger.info(f"Advanced AI call successful for {context}") - return response - - except Exception as advanced_error: - error_message = str(advanced_error) - logger.warning(f"Advanced AI call failed for {context}: {error_message}") - - # Fall back to basic AI for complex tasks - logger.info(f"Falling back to basic AI for complex task: {context}") - try: - response = await asyncio.wait_for( - self._callStandardAI(prompt, context), - timeout=self.ai_call_timeout - ) - - # Reset failure count on success - self.ai_failure_count = 0 - logger.info(f"Basic AI fallback successful for complex task: {context}") - return response - - except Exception as standard_error: - # Both failed for complex task - error_message = f"Advanced AI failed: {str(advanced_error)}. Basic AI failed: {str(standard_error)}" - raise Exception(error_message) - - else: # basic - # Use basic AI for simple tasks - try: - response = await asyncio.wait_for( - self._callStandardAI(prompt, context), - timeout=self.ai_call_timeout - ) - - # Reset failure count on success - self.ai_failure_count = 0 - logger.info(f"Basic AI call successful for {context}") - return response - - except Exception as basic_error: - error_message = str(basic_error) - logger.warning(f"Basic AI call failed for {context}: {error_message}") - - # Only upgrade to advanced AI for critical simple tasks - if self._isCriticalTask(context): - logger.info(f"Upgrading to advanced AI for critical simple task: {context}") - try: - response = await asyncio.wait_for( - self._callAdvancedAI(prompt, context), - timeout=self.ai_call_timeout - ) - - # Reset failure count on success - self.ai_failure_count = 0 - logger.info(f"Advanced AI upgrade successful for critical task: {context}") - return response - - except Exception as advanced_error: - # Both failed for critical task - error_message = f"Basic AI failed: {str(basic_error)}. Advanced AI failed: {str(advanced_error)}" - raise Exception(error_message) - else: - # Non-critical simple task failed - raise Exception(f"Basic AI failed for simple task: {error_message}") - - except asyncio.TimeoutError: - self._recordAIFailure("Timeout") - if attempt < max_retries - 1: - delay = base_delay * (2 ** attempt) # Exponential backoff - logger.warning(f"AI call timed out, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries})") - await asyncio.sleep(delay) - continue - else: - raise Exception(f"AI call timed out after {self.ai_call_timeout} seconds") - - except Exception as e: - error_message = str(e) - - # Special handling for overloaded service (529 error) - if "overloaded" in error_message.lower() or "529" in error_message: - if attempt < max_retries - 1: - delay = base_delay * (2 ** attempt) # Exponential backoff - logger.warning(f"AI service overloaded, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries})") - await asyncio.sleep(delay) - continue - else: - # Don't record this as a circuit breaker failure since it's a service issue - raise Exception("AI service is currently overloaded. Please try again in a few minutes.") - - # For other errors, record failure and potentially retry - self._recordAIFailure(error_message) - if attempt < max_retries - 1: - delay = base_delay * (2 ** attempt) # Exponential backoff - logger.warning(f"AI call failed, retrying in {delay} seconds (attempt {attempt + 1}/{max_retries}): {error_message}") - await asyncio.sleep(delay) - continue - else: - raise - - def _isCircuitBreakerOpen(self) -> bool: - """Check if circuit breaker is open""" - if self.ai_failure_count >= self.ai_circuit_breaker_threshold: - if self.ai_last_failure_time: - time_since_failure = (datetime.now(UTC) - self.ai_last_failure_time).total_seconds() - if time_since_failure < self.ai_circuit_breaker_timeout: - return True - else: - # Reset circuit breaker after timeout - self.ai_failure_count = 0 - self.ai_last_failure_time = None - return False - - def _determineAIChoice(self, prompt: str, context: str) -> str: - """Determine whether to use advanced or basic AI based on task complexity""" - - # Check for forced AI choice based on context - forced_choice = self._getForcedAIChoice(context) - if forced_choice: - logger.debug(f"Forced AI choice for {context}: {forced_choice}") - return forced_choice - - # Define complex task patterns that require advanced AI - complex_patterns = [ - # Task planning and workflow management - "task_planning", "action_generation", "result_review", "task_completion_validation", - - # Complex document analysis - "document", "extract", "analysis", "comprehensive", "detailed analysis", - - # Multi-step reasoning - "plan", "strategy", "evaluate", "assess", "compare", "analyze", - - # Complex business logic - "workflow", "task", "action", "validation", "review", "assessment", - - # Critical decision making - "decision", "recommendation", "evaluation", "quality", "success criteria", - - # Complex prompts - "JSON", "structured", "format", "validation", "improvements", "quality_score" - ] - - # Define simple task patterns that can use basic AI - simple_patterns = [ - # Basic text processing - "summarize", "translate", "format", "convert", "extract text", - - # Simple queries - "find", "search", "list", "get", "retrieve", - - # Basic operations - "send", "upload", "download", "create", "delete", - - # Simple responses - "confirm", "acknowledge", "status", "info" - ] - - # Check prompt and context for complexity indicators - combined_text = f"{prompt} {context}".lower() - - # Count complex indicators - complex_count = sum(1 for pattern in complex_patterns if pattern in combined_text) - - # Count simple indicators - simple_count = sum(1 for pattern in simple_patterns if pattern in combined_text) - - # Additional complexity factors - prompt_length = len(prompt) - has_json_requirement = "json" in combined_text and ("{" in prompt or "}" in prompt) - has_structured_output = any(word in combined_text for word in ["format", "structure", "template"]) - has_validation = any(word in combined_text for word in ["validate", "check", "verify", "quality"]) - - # Calculate complexity score - complexity_score = 0 - complexity_score += complex_count * 2 # Complex patterns worth more - complexity_score += simple_count * 1 # Simple patterns worth less - complexity_score += (prompt_length > 1000) * 3 # Long prompts are complex - complexity_score += has_json_requirement * 5 # JSON requirements are complex - complexity_score += has_structured_output * 3 # Structured output is complex - complexity_score += has_validation * 4 # Validation is complex - - # Determine AI choice based on complexity score - if complexity_score >= 5: - logger.debug(f"Complex task detected (score: {complexity_score}) - using advanced AI for {context}") - return "advanced" - else: - logger.debug(f"Simple task detected (score: {complexity_score}) - using basic AI for {context}") - return "basic" - - def _getForcedAIChoice(self, context: str) -> str: - """Get forced AI choice for specific contexts (can be overridden)""" - - # Define contexts that always use advanced AI - advanced_contexts = [ - "task_planning", # Always use advanced for task planning - "action_generation", # Always use advanced for action generation - "result_review", # Always use advanced for result review - "task_completion_validation" # Always use advanced for validation - ] - - # Define contexts that always use basic AI - basic_contexts = [ - "summarize", # Always use basic for summarization - "translate", # Always use basic for translation - "format", # Always use basic for formatting - "status", # Always use basic for status updates - "info" # Always use basic for info queries - ] - - context_lower = context.lower() - - # Check for forced advanced AI - for advanced_context in advanced_contexts: - if advanced_context in context_lower: - return "advanced" - - # Check for forced basic AI - for basic_context in basic_contexts: - if basic_context in context_lower: - return "basic" - - # No forced choice - return None - - def _isCriticalTask(self, context: str) -> bool: - """Determine if a simple task is critical enough to warrant advanced AI upgrade""" - - # Define critical task patterns - critical_patterns = [ - # Workflow critical tasks - "task_planning", "workflow", "critical", "essential", - - # User-facing decisions - "decision", "recommendation", "evaluation", "assessment", - - # Quality-sensitive tasks - "quality", "validation", "review", "check", - - # Business-critical operations - "business", "strategy", "planning", "analysis" - ] - - context_lower = context.lower() - - # Check if context contains critical patterns - is_critical = any(pattern in context_lower for pattern in critical_patterns) - - if is_critical: - logger.debug(f"Critical task detected - {context}") - - return is_critical - - def _recordAIFailure(self, error: str): - """Record AI failure for circuit breaker""" - self.ai_failure_count += 1 - self.ai_last_failure_time = datetime.now(UTC) - logger.warning(f"AI failure recorded ({self.ai_failure_count}/{self.ai_circuit_breaker_threshold}): {error}") - - def _validateTaskPlan(self, task_plan: Dict[str, Any]) -> bool: - """Validate task plan structure and dependencies""" - try: - if not isinstance(task_plan, dict): - return False - - if 'tasks' not in task_plan or not isinstance(task_plan['tasks'], list): - return False - - # Check each task - task_ids = set() - for task in task_plan['tasks']: - if not isinstance(task, dict): - return False - - required_fields = ['id', 'description', 'expected_outputs', 'success_criteria'] - if not all(field in task for field in required_fields): - return False - - # Check for duplicate task IDs - if task['id'] in task_ids: - return False - task_ids.add(task['id']) - - # Validate dependencies - dependencies = task.get('dependencies', []) - if not isinstance(dependencies, list): - return False - - # Check that dependencies reference existing tasks - for dep in dependencies: - if dep not in task_ids and dep != 'task_0': # Allow task_0 as special case - return False - - # Validate ai_prompt if present (optional field) - if 'ai_prompt' in task and not isinstance(task['ai_prompt'], str): - return False - - return True - - except Exception as e: - logger.error(f"Error validating task plan: {str(e)}") - return False - - - def _validateActions(self, actions: List[Dict[str, Any]], context: TaskContext) -> bool: - """Validate generated actions""" - try: - if not isinstance(actions, list): - logger.error("Actions must be a list") - return False - - if len(actions) == 0: - logger.warning("No actions generated") - return False - - for i, action in enumerate(actions): - if not isinstance(action, dict): - logger.error(f"Action {i} must be a dictionary") - return False - - # Check required fields - required_fields = ['method', 'action', 'parameters', 'resultLabel'] - missing_fields = [] - for field in required_fields: - if field not in action or not action[field]: - missing_fields.append(field) - - if missing_fields: - logger.error(f"Action {i} missing required fields: {missing_fields}") - return False - - # Validate result label format - result_label = action.get('resultLabel', '') - if not result_label.startswith('task'): - logger.error(f"Action {i} result label must start with 'task': {result_label}") - return False - - # Validate parameters - parameters = action.get('parameters', {}) - if not isinstance(parameters, dict): - logger.error(f"Action {i} parameters must be a dictionary") - return False - - logger.info(f"Successfully validated {len(actions)} actions") - return True - - except Exception as e: - logger.error(f"Error validating actions: {str(e)}") - return False - - - - # ===== Prompt Creation Methods ===== - - def createTaskPlanningPrompt(self, context: Dict[str, Any]) -> str: - """Create prompt for task planning""" - return f"""You are a task planning AI that analyzes user requests and creates structured task plans. - -USER REQUEST: {context['user_request']} - -AVAILABLE DOCUMENTS: {', '.join(context['available_documents'])} - -INSTRUCTIONS: -1. Analyze the user request and available documents -2. Break down the request into 2-4 meaningful high-level task steps -3. Focus on business outcomes, not technical operations -4. For document processing, create ONE task with a comprehensive AI prompt rather than multiple granular tasks -5. Each task should produce meaningful, usable outputs -6. Ensure proper handover between tasks using result labels -7. Return a JSON object with the exact structure shown below - -TASK PLANNING PRINCIPLES: -- Combine related operations into single tasks (e.g., "Extract and analyze all candidate profiles" instead of separate "read file" and "analyze content" tasks) -- Use comprehensive AI prompts for document processing rather than multiple small tasks -- Focus on business value and outcomes -- Keep tasks at a meaningful level of abstraction -- Each task should produce results that can be used by subsequent tasks - -REQUIRED JSON STRUCTURE: -{{ - "overview": "Brief description of the overall plan", - "tasks": [ - {{ - "id": "task_1", - "description": "Clear description of what this task accomplishes (business outcome)", - "dependencies": ["task_0"], // IDs of tasks that must complete first - "expected_outputs": ["output1", "output2"], - "success_criteria": ["criteria1", "criteria2"], - "required_documents": ["doc1", "doc2"], - "estimated_complexity": "low|medium|high", - "ai_prompt": "Comprehensive AI prompt for document processing tasks (if applicable)" - }} - ] -}} - -EXAMPLES OF GOOD TASK DESCRIPTIONS: -- "Extract and analyze all candidate profiles to identify key qualifications and experience" -- "Create evaluation matrix and rate candidates against product designer criteria" -- "Generate comprehensive PowerPoint presentation for management decision" -- "Store final presentation in SharePoint for specified account" - -EXAMPLES OF BAD TASK DESCRIPTIONS: -- "Open and read the PDF file" (too granular) -- "Identify table structure" (technical detail) -- "Convert data to CSV format" (implementation detail) - -NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" - - async def createActionDefinitionPrompt(self, context: TaskContext) -> str: - """Create prompt for action generation with enhanced document extraction guidance and retry context""" - task_step = context.task_step - workflow = context.workflow - available_docs = context.available_documents or [] - previous_results = context.previous_results or [] - improvements = context.improvements or [] - retry_count = context.retry_count or 0 - previous_action_results = context.previous_action_results or [] - previous_review_result = context.previous_review_result - - # Get available methods and actions with signatures - methodList = self.service.getMethodsList() - method_actions = {} - for sig in methodList: - if '.' in sig: - method, rest = sig.split('.', 1) - action = rest.split('(')[0] - method_actions.setdefault(method, []).append((action, sig)) - - # Get workflow history - messageSummary = await self.service.summarizeChat(workflow.messages) - - # Get available documents and connections - docRefs = self.service.getDocumentReferenceList() - connRefs = self.service.getConnectionReferenceList() - all_doc_refs = docRefs.get('chat', []) + docRefs.get('history', []) - - # Build AVAILABLE METHODS section - available_methods_str = '' - for method, actions in method_actions.items(): - available_methods_str += f"- {method}:\n" - for action, sig in actions: - available_methods_str += f" - {action}: {sig}\n" - - # Get AI prompt from task step if available - task_ai_prompt = task_step.ai_prompt or '' - - # Build retry context section - retry_context = "" - if retry_count > 0: - retry_context = f""" -RETRY CONTEXT (Attempt {retry_count}): -Previous action results that failed or were incomplete: -""" - for i, result in enumerate(previous_action_results): - retry_context += f"- Action {i+1}: {result.actionMethod or 'unknown'}.{result.actionName or 'unknown'}\n" - retry_context += f" Status: {result.success and 'success' or 'failed'}\n" - retry_context += f" Error: {result.error or 'None'}\n" - retry_context += f" Result: {(result.data.get('result', '') if result.data else '')[:100]}...\n" - - if previous_review_result: - retry_context += f""" -Previous review feedback: -- Status: {previous_review_result.status or 'unknown'} -- Reason: {previous_review_result.reason or 'No reason provided'} -- Quality Score: {previous_review_result.quality_score or 0}/10 -- Missing Outputs: {', '.join(previous_review_result.missing_outputs or [])} -- Unmet Criteria: {', '.join(previous_review_result.unmet_criteria or [])} -""" - - # Precompute all complex string expressions to avoid f-string nesting issues - expected_outputs_str = ', '.join(task_step.expected_outputs or []) - success_criteria_str = ', '.join(task_step.success_criteria or []) - previous_results_str = ', '.join(previous_results) if previous_results else 'None' - improvements_str = str(improvements) if improvements else 'None' - available_connections_str = '\n'.join(f"- {conn}" for conn in connRefs) - available_documents_str = '\n'.join(f"- {doc.documentsLabel} contains {', '.join(doc.documents)}" for doc in all_doc_refs) - # Build the prompt using only precomputed variables - prompt = f""" -You are an action generation AI that creates specific actions to accomplish a task step. - -DOCUMENT REFERENCE TYPES: -- docItem: Reference to a single document. Format: "docItem::" -- docList: Reference to a group of documents under a label. Format: