gateway/modules/workflows/processing/adaptive/adaptiveLearningEngine.py
2026-03-03 18:57:20 +01:00

260 lines
11 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
# adaptiveLearningEngine.py
# Enhanced learning engine that tracks validation patterns and adapts prompts
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timezone
from collections import defaultdict
logger = logging.getLogger(__name__)
class AdaptiveLearningEngine:
"""Enhanced learning engine that tracks validation patterns and adapts prompts"""
def __init__(self):
self.validationHistory = []
self.failurePatterns = defaultdict(list)
self.successPatterns = defaultdict(list)
self.actionAttempts = defaultdict(int)
self.learningInsights = {}
def reset(self):
"""Reset all learned state for a new workflow session."""
self.validationHistory.clear()
self.failurePatterns.clear()
self.successPatterns.clear()
self.actionAttempts.clear()
self.learningInsights.clear()
def recordValidationResult(self, validationResult: Dict[str, Any], actionContext: Dict[str, Any],
workflowId: str, attemptNumber: int):
"""Record validation result and learn from it"""
try:
actionName = actionContext.get('actionName', 'unknown')
# Store validation history
validationEntry = {
'workflowId': workflowId,
'actionName': actionName,
'attemptNumber': attemptNumber,
'validationResult': validationResult,
'actionContext': actionContext,
'timestamp': datetime.now(timezone.utc).isoformat(),
'success': validationResult.get('overallSuccess', False),
'qualityScore': validationResult.get('qualityScore', 0.0)
}
self.validationHistory.append(validationEntry)
# Track patterns
if validationResult.get('overallSuccess', False):
self.successPatterns[actionName].append(validationEntry)
else:
self.failurePatterns[actionName].append(validationEntry)
# Update attempt count
self.actionAttempts[f"{workflowId}:{actionName}"] += 1
# Generate learning insights
self._generateLearningInsights(workflowId, actionName)
logger.info(f"Recorded validation for {actionName} (attempt {attemptNumber}): "
f"Success={validationResult.get('overallSuccess', False)}, "
f"Quality={validationResult.get('qualityScore', 0.0)}")
except Exception as e:
logger.error(f"Error recording validation result: {str(e)}")
def getAdaptiveContextForActionSelection(self, workflowId: str, userPrompt: str) -> Dict[str, Any]:
"""Generate adaptive context for action selection prompt"""
try:
# Get recent validation history for this workflow
recentValidations = [
v for v in self.validationHistory
if v['workflowId'] == workflowId
][-5:] # Last 5 validations
# Analyze failure patterns
failureAnalysis = self._analyzeFailurePatterns(recentValidations)
# Generate specific guidance for next action
adaptiveGuidance = self._generateActionGuidance(userPrompt, recentValidations, failureAnalysis)
return {
'recentValidations': recentValidations,
'failureAnalysis': failureAnalysis,
'adaptiveGuidance': adaptiveGuidance,
'learningInsights': self.learningInsights.get(workflowId, {}),
'escalationLevel': self._getEscalationLevel(workflowId)
}
except Exception as e:
logger.error(f"Error generating adaptive context: {str(e)}")
return {}
def getAdaptiveContextForParameters(self, workflowId: str, actionName: str,
parametersContext: str) -> Dict[str, Any]:
"""Generate adaptive context for parameter selection prompt"""
try:
# Get validation history for this specific action name
actionValidations = [
v for v in self.validationHistory
if v['workflowId'] == workflowId and v['actionName'] == actionName
][-3:] # Last 3 attempts for this action
# Analyze what went wrong in previous attempts
failureAnalysis = self._analyzeParameterFailures(actionValidations)
# Generate specific parameter guidance
parameterGuidance = self._generateParameterGuidance(actionName, parametersContext, failureAnalysis)
return {
'actionValidations': actionValidations,
'failureAnalysis': failureAnalysis,
'parameterGuidance': parameterGuidance,
'attemptNumber': len(actionValidations) + 1,
'escalationLevel': self._getEscalationLevel(workflowId)
}
except Exception as e:
logger.error(f"Error generating parameter context: {str(e)}")
return {}
def _analyzeFailurePatterns(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze failure patterns from validation history"""
if not validations:
return {}
failedValidations = [v for v in validations if not v['success']]
if not failedValidations:
return {'hasFailures': False}
# Extract common failure themes
commonIssues = []
for validation in failedValidations:
issues = validation['validationResult'].get('validationDetails', [{}])
for issue in issues:
commonIssues.extend(issue.get('issues', []))
# Count most common issues
issueCounts = defaultdict(int)
for issue in commonIssues:
issueCounts[issue] += 1
return {
'hasFailures': True,
'failureCount': len(failedValidations),
'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)),
'lastFailure': failedValidations[-1] if failedValidations else None
}
def _analyzeParameterFailures(self, validations: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Analyze parameter-specific failure patterns"""
if not validations:
return {'hasFailures': False}
failedValidations = [v for v in validations if not v['success']]
if not failedValidations:
return {'hasFailures': False}
# Extract common failure themes
commonIssues = []
for validation in failedValidations:
issues = validation['validationResult'].get('validationDetails', [{}])
for issue in issues:
commonIssues.extend(issue.get('issues', []))
# Count most common issues
issueCounts = defaultdict(int)
for issue in commonIssues:
issueCounts[issue] += 1
return {
'hasFailures': True,
'failureCount': len(failedValidations),
'commonIssues': dict(sorted(issueCounts.items(), key=lambda x: x[1], reverse=True)),
'lastFailure': failedValidations[-1] if failedValidations else None,
'attemptNumber': len(validations)
}
def _generateActionGuidance(self, userPrompt: str, validations: List[Dict[str, Any]],
failureAnalysis: Dict[str, Any]) -> str:
"""Generate specific guidance for next action based on learning"""
if not failureAnalysis.get('hasFailures', False):
return "No previous failures detected. Proceed with standard approach."
guidance_parts = []
# Add failure awareness
failureCount = failureAnalysis.get('failureCount', 0)
if failureCount >= 3:
guidance_parts.append("CRITICAL: Multiple previous attempts have failed. Consider alternative approaches.")
elif failureCount >= 1:
guidance_parts.append("WARNING: Previous attempts have failed. Learn from validation feedback.")
# Add specific issue guidance
commonIssues = failureAnalysis.get('commonIssues', {})
if commonIssues:
guidance_parts.append("COMMON FAILURE PATTERNS:")
for issue, count in list(commonIssues.items())[:3]: # Top 3 issues
guidance_parts.append(f"- {issue} (occurred {count} times)")
return "\n".join(guidance_parts) if guidance_parts else "No specific guidance available."
def _generateParameterGuidance(self, actionName: str, parametersContext: str,
failureAnalysis: Dict[str, Any]) -> str:
"""Generate generic parameter guidance based on previous failures (no app-specific logic)."""
if not failureAnalysis.get('hasFailures', False):
return "No previous parameter failures. Use standard parameter values."
guidanceParts = []
# Attempt awareness
attemptNumber = failureAnalysis.get('attemptNumber', 1)
if attemptNumber and attemptNumber >= 3:
guidanceParts.append(f"Attempt #{attemptNumber}: Adjust parameters based on validation feedback.")
commonIssues = failureAnalysis.get('commonIssues', {}) or {}
if commonIssues:
guidanceParts.append("Address the following parameter issues:")
for issueText, count in commonIssues.items():
guidanceParts.append(f"- {issueText} (occurred {count} time{'s' if count != 1 else ''})")
# Keep guidance format stable
return "\n".join(guidanceParts) if guidanceParts else "Use standard parameter values."
def _getEscalationLevel(self, workflowId: str) -> str:
"""Determine escalation level based on failure patterns"""
workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId]
failedAttempts = len([v for v in workflowValidations if not v['success']])
if failedAttempts >= 5:
return "critical"
elif failedAttempts >= 3:
return "high"
elif failedAttempts >= 1:
return "medium"
else:
return "low"
def _generateLearningInsights(self, workflowId: str, actionName: str):
"""Generate learning insights for a workflow"""
if workflowId not in self.learningInsights:
self.learningInsights[workflowId] = {}
# Analyze patterns for this workflow
workflowValidations = [v for v in self.validationHistory if v['workflowId'] == workflowId]
insights = {
'totalAttempts': len(workflowValidations),
'successfulAttempts': len([v for v in workflowValidations if v['success']]),
'failedAttempts': len([v for v in workflowValidations if not v['success']]),
'lastActionName': actionName,
'escalationLevel': self._getEscalationLevel(workflowId)
}
self.learningInsights[workflowId] = insights