gateway/modules/workflows/processing/adaptive/adaptiveLearningEngine.py

271 lines
13 KiB
Python

# adaptiveLearningEngine.py
# Enhanced learning engine that tracks validation patterns and adapts prompts
import json
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
from collections import defaultdict
logger = logging.getLogger(__name__)
class AdaptiveLearningEngine:
"""Enhanced learning engine that tracks validation patterns and adapts prompts"""
def __init__(self):
self.validationHistory = [] # Store validation results with context
self.failurePatterns = defaultdict(list) # Track failure patterns by action type
self.successPatterns = defaultdict(list) # Track success patterns
self.actionAttempts = defaultdict(int) # Track attempt counts per action
self.learningInsights = {} # Store learned insights per workflow
def recordValidationResult(self, validationResult: Dict[str, Any], actionContext: Dict[str, Any],
workflowId: str, attemptNumber: int):
"""Record validation result and learn from it"""
try:
actionType = actionContext.get('actionType', 'unknown')
actionName = actionContext.get('actionName', 'unknown')
# Store validation history
validationEntry = {
'workflowId': workflowId,
'actionType': actionType,
'actionName': actionName,
'attemptNumber': attemptNumber,
'validationResult': validationResult,
'actionContext': actionContext,
'timestamp': datetime.now(timezone.utc).isoformat(),
'success': validationResult.get('overallSuccess', False),
'qualityScore': validationResult.get('qualityScore', 0.0)
}
self.validationHistory.append(validationEntry)
# Track patterns
if validationResult.get('overallSuccess', False):
self.successPatterns[actionType].append(validationEntry)
else:
self.failurePatterns[actionType].append(validationEntry)
# Update attempt count
self.actionAttempts[f"{workflowId}:{actionType}"] += 1
# Generate learning insights
self._generateLearningInsights(workflowId, actionType)
logger.info(f"Recorded validation for {actionType} (attempt {attemptNumber}): "
f"Success={validationResult.get('overallSuccess', False)}, "
f"Quality={validationResult.get('qualityScore', 0.0)}")
except Exception as e:
logger.error(f"Error recording validation result: {str(e)}")
def getAdaptiveContextForActionSelection(self, workflowId: str, userPrompt: str) -> Dict[str, Any]:
"""Generate adaptive context for action selection prompt"""
try:
# Get recent validation history for this workflow
recentValidations = [
v for v in self.validationHistory
if v['workflowId'] == workflowId
][-5:] # Last 5 validations
# Analyze failure patterns
failureAnalysis = self._analyzeFailurePatterns(recentValidations)
# Generate specific guidance for next action
adaptiveGuidance = self._generateActionGuidance(userPrompt, recentValidations, failureAnalysis)
return {
'recentValidations': recentValidations,
'failureAnalysis': failureAnalysis,
'adaptiveGuidance': adaptiveGuidance,
'learningInsights': self.learningInsights.get(workflowId, {}),
'escalationLevel': self._getEscalationLevel(workflowId)
}
except Exception as e:
logger.error(f"Error generating adaptive context: {str(e)}")
return {}
def getAdaptiveContextForParameters(self, workflowId: str, actionType: str,
parametersContext: str) -> Dict[str, Any]:
"""Generate adaptive context for parameter selection prompt"""
try:
# Get validation history for this specific action type
actionValidations = [
v for v in self.validationHistory
if v['workflowId'] == workflowId and v['actionType'] == actionType
][-3:] # Last 3 attempts for this action
# Analyze what went wrong in previous attempts
failureAnalysis = self._analyzeParameterFailures(actionValidations)
# Generate specific parameter guidance
parameterGuidance = self._generateParameterGuidance(actionType, parametersContext, failureAnalysis)
return {
'actionValidations': actionValidations,
'failureAnalysis': failureAnalysis,
'parameterGuidance': parameterGuidance,
'attemptNumber': len(actionValidations) + 1,
'escalationLevel': self._getEscalationLevel(workflowId)
}
except Exception as e:
logger.error(f"Error generating parameter context: {str(e)}")
return {}
def _analyzeFailurePatterns(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze failure patterns from validation history"""
if not validations:
return {}
failedValidations = [v for v in validations if not v['success']]
if not failedValidations:
return {'hasFailures': False}
# Extract common failure themes
commonIssues = []
for validation in failedValidations:
issues = validation['validationResult'].get('validationDetails', [{}])
for issue in issues:
commonIssues.extend(issue.get('issues', []))
# Count most common issues
issueCounts = defaultdict(int)
for issue in commonIssues:
issueCounts[issue] += 1
return {
'hasFailures': True,
'failureCount': len(failedValidations),
'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)),
'lastFailure': failedValidations[-1] if failedValidations else None
}
def _analyzeParameterFailures(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze parameter-specific failure patterns"""
if not validations:
return {'hasFailures': False}
failedValidations = [v for v in validations if not v['success']]
if not failedValidations:
return {'hasFailures': False}
# Extract common failure themes
commonIssues = []
for validation in failedValidations:
issues = validation['validationResult'].get('validationDetails', [{}])
for issue in issues:
commonIssues.extend(issue.get('issues', []))
# Count most common issues
issueCounts = defaultdict(int)
for issue in commonIssues:
issueCounts[issue] += 1
return {
'hasFailures': True,
'failureCount': len(failedValidations),
'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)),
'lastFailure': failedValidations[-1] if failedValidations else None,
'attemptNumber': len(validations)
}
def _generateActionGuidance(self, userPrompt: str, validations: List[Dict[str, Any]],
failureAnalysis: Dict[str, Any]) -> str:
"""Generate specific guidance for next action based on learning"""
if not failureAnalysis.get('hasFailures', False):
return "No previous failures detected. Proceed with standard approach."
guidance_parts = []
# Add failure awareness
failureCount = failureAnalysis.get('failureCount', 0)
if failureCount >= 3:
guidance_parts.append("CRITICAL: Multiple previous attempts have failed. Consider alternative approaches.")
elif failureCount >= 1:
guidance_parts.append("WARNING: Previous attempts have failed. Learn from validation feedback.")
# Add specific issue guidance
commonIssues = failureAnalysis.get('commonIssues', {})
if commonIssues:
guidance_parts.append("COMMON FAILURE PATTERNS:")
for issue, count in list(commonIssues.items())[:3]: # Top 3 issues
guidance_parts.append(f"- {issue} (occurred {count} times)")
# Add specific action guidance based on user prompt
if "email" in userPrompt.lower() and "outlook" in userPrompt.lower():
if any("account" in str(issue).lower() for issue in commonIssues.keys()):
guidance_parts.append("SPECIFIC GUIDANCE: Ensure email is sent from the correct account (valueon).")
if any("attachment" in str(issue).lower() for issue in commonIssues.keys()):
guidance_parts.append("SPECIFIC GUIDANCE: Verify PDF attachment is properly included.")
if any("summary" in str(issue).lower() for issue in commonIssues.keys()):
guidance_parts.append("SPECIFIC GUIDANCE: Include German summary in email body.")
return "\n".join(guidance_parts) if guidance_parts else "No specific guidance available."
def _generateParameterGuidance(self, actionType: str, parametersContext: str,
failureAnalysis: Dict[str, Any]) -> str:
"""Generate specific parameter guidance based on previous failures"""
if not failureAnalysis.get('hasFailures', False):
return "No previous parameter failures. Use standard parameter values."
guidance_parts = []
# Add attempt awareness
attemptNumber = failureAnalysis.get('attemptNumber', 1)
if attemptNumber >= 3:
guidance_parts.append(f"ATTEMPT #{attemptNumber}: Previous attempts failed. Adjust parameters based on validation feedback.")
# Add specific parameter guidance based on action type
if actionType == "outlook.composeAndSendEmailWithContext":
guidance_parts.append("EMAIL PARAMETER GUIDANCE:")
guidance_parts.append("- context: Be very specific about account (valueon), appointment time (Friday), and requirements")
guidance_parts.append("- emailStyle: Use 'formal' for business emails")
guidance_parts.append("- maxLength: Set to 2000+ for detailed emails with summaries")
# Add specific guidance based on common failures
commonIssues = failureAnalysis.get('commonIssues', {})
if any("account" in str(issue).lower() for issue in commonIssues.keys()):
guidance_parts.append("- context: MUST specify 'from valueon account' explicitly")
if any("attachment" in str(issue).lower() for issue in commonIssues.keys()):
guidance_parts.append("- documentList: Ensure PDF is properly referenced")
if any("summary" in str(issue).lower() for issue in commonIssues.keys()):
guidance_parts.append("- context: MUST request '10-12 sentence German summary' explicitly")
return "\n".join(guidance_parts) if guidance_parts else "Use standard parameter values."
def _getEscalationLevel(self, workflowId: str) -> str:
"""Determine escalation level based on failure patterns"""
workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId]
failedAttempts = len([v for v in workflowValidations if not v['success']])
if failedAttempts >= 5:
return "critical"
elif failedAttempts >= 3:
return "high"
elif failedAttempts >= 1:
return "medium"
else:
return "low"
def _generateLearningInsights(self, workflowId: str, actionType: str):
"""Generate learning insights for a workflow"""
if workflowId not in self.learningInsights:
self.learningInsights[workflowId] = {}
# Analyze patterns for this workflow
workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId]
insights = {
'totalAttempts': len(workflowValidations),
'successfulAttempts': len([v for v in workflowValidations if v['success']]),
'failedAttempts': len([v for v in workflowValidations if not v['success']]),
'lastActionType': actionType,
'escalationLevel': self._getEscalationLevel(workflowId)
}
self.learningInsights[workflowId] = insights