120 lines
4.5 KiB
Python
120 lines
4.5 KiB
Python
# intentAnalyzer.py
|
|
# Intent analysis for adaptive React mode - AI-based, language-agnostic
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class IntentAnalyzer:
|
|
"""Analyzes user intent using AI - language-agnostic and generic"""
|
|
|
|
def __init__(self, services=None):
|
|
self.services = services
|
|
|
|
async def analyzeUserIntent(self, userPrompt: str, context: Any) -> Dict[str, Any]:
|
|
"""Analyzes user intent from prompt and context using AI"""
|
|
try:
|
|
# Use AI to analyze intent
|
|
aiAnalysis = await self._analyzeIntentWithAI(userPrompt, context)
|
|
if aiAnalysis:
|
|
return aiAnalysis
|
|
|
|
# Fallback to basic analysis if AI fails
|
|
return self._createBasicIntentAnalysis(userPrompt)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error analyzing user intent: {str(e)}")
|
|
return self._createDefaultIntentAnalysis(userPrompt)
|
|
|
|
async def _analyzeIntentWithAI(self, userPrompt: str, context: Any) -> Dict[str, Any]:
|
|
"""Uses AI to analyze user intent - language-agnostic"""
|
|
try:
|
|
if not self.services or not hasattr(self.services, 'ai'):
|
|
return None
|
|
|
|
# Create AI analysis prompt
|
|
analysisPrompt = f"""
|
|
You are an intent analyzer. Analyze the user's request to understand what they want delivered.
|
|
|
|
USER REQUEST: {userPrompt}
|
|
|
|
CONTEXT: {getattr(context.task_step, 'objective', '') if hasattr(context, 'task_step') and context.task_step else ''}
|
|
|
|
Analyze the user's intent and determine:
|
|
1. What type of data/content they want (numbers, text, documents, analysis, code, etc.)
|
|
2. What format they expect (raw data, formatted, structured, visual, etc.)
|
|
3. What quality requirements they have (accuracy, completeness, format)
|
|
4. What specific success criteria define completion
|
|
|
|
Respond with JSON only:
|
|
{{
|
|
"primaryGoal": "The main objective the user wants to achieve",
|
|
"dataType": "numbers|text|documents|analysis|code|unknown",
|
|
"expectedFormat": "raw_data|formatted|structured|visual|unknown",
|
|
"qualityRequirements": {{
|
|
"accuracyThreshold": 0.0-1.0,
|
|
"completenessThreshold": 0.0-1.0,
|
|
"formatRequirement": "any|formatted|raw|structured"
|
|
}},
|
|
"successCriteria": ["specific criterion 1", "specific criterion 2"],
|
|
"confidenceScore": 0.0-1.0
|
|
}}
|
|
"""
|
|
|
|
# Call AI service for analysis
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationType
|
|
request_options = AiCallOptions()
|
|
request_options.operationType = OperationType.GENERAL
|
|
|
|
response = await self.services.ai.callAi(
|
|
prompt=analysisPrompt,
|
|
documents=None,
|
|
options=request_options
|
|
)
|
|
if response:
|
|
import re
|
|
result = response.strip()
|
|
json_match = re.search(r'\{.*\}', result, re.DOTALL)
|
|
if json_match:
|
|
result = json_match.group(0)
|
|
|
|
aiResult = json.loads(result)
|
|
return aiResult
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"AI intent analysis failed: {str(e)}")
|
|
return None
|
|
|
|
def _createBasicIntentAnalysis(self, userPrompt: str) -> Dict[str, Any]:
|
|
"""Creates basic intent analysis without AI"""
|
|
return {
|
|
"primaryGoal": userPrompt.strip(),
|
|
"dataType": "unknown",
|
|
"expectedFormat": "unknown",
|
|
"qualityRequirements": {
|
|
"accuracyThreshold": 0.8,
|
|
"completenessThreshold": 0.8,
|
|
"formatRequirement": "any"
|
|
},
|
|
"successCriteria": ["Delivers what the user requested"],
|
|
"confidenceScore": 0.5
|
|
}
|
|
|
|
def _createDefaultIntentAnalysis(self, userPrompt: str) -> Dict[str, Any]:
|
|
"""Creates a default intent analysis when analysis fails"""
|
|
return {
|
|
"primaryGoal": userPrompt,
|
|
"dataType": "unknown",
|
|
"expectedFormat": "unknown",
|
|
"qualityRequirements": {
|
|
"accuracyThreshold": 0.8,
|
|
"completenessThreshold": 0.8,
|
|
"formatRequirement": "any"
|
|
},
|
|
"successCriteria": ["Delivers what the user requested"],
|
|
"confidenceScore": 0.1
|
|
}
|