Centralized AI

This commit is contained in:
ValueOn AG 2025-09-02 21:11:32 +02:00
parent 3c1f66cb6d
commit 8726cd4fb8
5 changed files with 459 additions and 1285 deletions

View file

@ -1,276 +0,0 @@
# AI Engine Migration Plan
## Overview
This document outlines the migration strategy from the current AI call system to the new Smart AI Engine architecture.
## Benefits of the New Architecture
### 1. **Separation of Concerns**
- Applications no longer need to worry about content size limits
- Centralized AI model selection and failover
- Intelligent content reduction strategies
### 2. **Improved Reliability**
- Automatic handling of "content too large" errors
- Multiple fallback strategies
- Model-specific optimization
### 3. **Better Performance**
- Optimal model selection based on content characteristics
- Intelligent chunking and processing strategies
- Reduced API costs through smart model selection
### 4. **Enhanced Maintainability**
- Single point of AI logic
- Easy to add new models and strategies
- Consistent error handling
## Migration Phases
### Phase 1: Infrastructure Setup (Week 1-2)
1. **Create AI Engine Interface**
- ✅ `interfaceAiEngine.py` - Core interfaces and data structures
- ✅ `aiEngine.py` - Smart AI Engine implementation
- ✅ `serviceCenter_ai_engine.py` - ServiceCenter integration
2. **Update Dependencies**
- Add new imports to existing modules
- Update configuration for AI model selection
- Add logging for AI engine operations
### Phase 2: ServiceCenter Integration (Week 3)
1. **Update ServiceCenter Class**
```python
# Add to ServiceCenter.__init__
self.ai_engine = ServiceCenterAIEngine(self)
# Replace existing AI methods
async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
return await self.ai_engine.callAiTextAdvanced(prompt, context)
async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
return await self.ai_engine.callAiTextBasic(prompt, context)
async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str:
return await self.ai_engine.extractContentFromDocument(prompt, document)
async def summarizeChat(self, messages: List[ChatMessage]) -> str:
return await self.ai_engine.summarizeChat(messages)
```
2. **Add New Document-Aware Methods**
```python
async def callAiWithDocuments(
self,
prompt: str,
documents: List[ChatDocument] = None,
operation_type: str = "general"
) -> str:
return await self.ai_engine.callAiWithDocuments(
prompt, documents, operation_type=operation_type
)
```
### Phase 3: Method Updates (Week 4-5)
1. **Update MethodWeb.py**
```python
# Before
web_scrape_result = await web_interface.scrape(web_scrape_request)
# After - no changes needed, but can be enhanced
# The AI engine will automatically handle large content
```
2. **Update MethodDocument.py**
```python
# Before
formatted_content = await self.service.callAiTextBasic(ai_prompt, content)
# After
formatted_content = await self.service.callAiForReportGeneration(
prompt=ai_prompt,
documents=chat_documents
)
```
3. **Update MethodAi.py**
```python
# Before
result = await self.service.callAiTextAdvanced(enhanced_prompt, context)
# After
result = await self.service.callAiWithDocuments(
prompt=enhanced_prompt,
documents=document_list,
operation_type="ai_processing"
)
```
4. **Update MethodOutlook.py**
```python
# Before
composed_email = await self.service.interfaceAiCalls.callAiTextAdvanced(ai_prompt)
# After
composed_email = await self.service.callAiForEmailComposition(
prompt=ai_prompt,
documents=attached_documents
)
```
### Phase 4: Task Handling Updates (Week 6)
1. **Update handlingTasks.py**
```python
# Before
prompt = await self.service.callAiTextAdvanced(task_planning_prompt)
# After
prompt = await self.service.callAiForTaskPlanning(
prompt=task_planning_prompt,
documents=available_documents,
context=workflow_context
)
```
2. **Update promptFactory.py**
```python
# Before
messageSummary = await service.summarizeChat(context.workflow.messages)
# After - no changes needed, method signature stays the same
# But internally uses the new AI engine
```
### Phase 5: Testing and Optimization (Week 7-8)
1. **Unit Tests**
- Test AI engine with various content sizes
- Test fallback strategies
- Test model selection logic
2. **Integration Tests**
- Test with real documents of various sizes
- Test error scenarios
- Test performance improvements
3. **Performance Monitoring**
- Monitor AI call success rates
- Monitor processing times
- Monitor cost savings
## Code Changes Required
### 1. ServiceCenter Updates
```python
# Add to ServiceCenter.__init__
from modules.chat.serviceCenter_ai_engine import ServiceCenterAIEngine
self.ai_engine_wrapper = ServiceCenterAIEngine(self)
# Update existing methods to use AI engine
async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
return await self.ai_engine_wrapper.callAiTextAdvanced(prompt, context)
async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
return await self.ai_engine_wrapper.callAiTextBasic(prompt, context)
async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str:
return await self.ai_engine_wrapper.extractContentFromDocument(prompt, document)
async def summarizeChat(self, messages: List[ChatMessage]) -> str:
return await self.ai_engine_wrapper.summarizeChat(messages)
```
### 2. Method Updates (Optional Enhancements)
```python
# Enhanced method calls with document awareness
async def process_documents_with_ai(self, prompt: str, documents: List[ChatDocument]):
return await self.service.callAiWithDocuments(
prompt=prompt,
documents=documents,
operation_type="document_processing"
)
```
### 3. Configuration Updates
```ini
# Add to config.ini
[AI_ENGINE]
DEFAULT_MODEL=anthropic_claude
FALLBACK_MODEL=openai_gpt35
MAX_CONTENT_SIZE=100000
ENABLE_CONTENT_REDUCTION=true
CONTENT_REDUCTION_THRESHOLD=0.8
```
## Backward Compatibility
### 1. **Method Signatures**
- All existing method signatures remain unchanged
- Internal implementation uses new AI engine
- No breaking changes for existing code
### 2. **Error Handling**
- Same error types and messages
- Enhanced error recovery with fallback strategies
- Better error reporting with processing details
### 3. **Performance**
- Same or better performance
- Automatic optimization based on content
- Reduced API costs through smart model selection
## Risk Mitigation
### 1. **Gradual Rollout**
- Deploy with feature flags
- A/B testing with subset of users
- Rollback capability
### 2. **Monitoring**
- Comprehensive logging of AI engine operations
- Performance metrics tracking
- Error rate monitoring
### 3. **Fallback Strategy**
- Keep original AI call methods as backup
- Automatic fallback to original methods on errors
- Manual override capability
## Expected Benefits
### 1. **Immediate Benefits**
- Elimination of "content too large" errors
- Better handling of large documents
- Improved user experience
### 2. **Long-term Benefits**
- Easier addition of new AI models
- Better cost optimization
- Enhanced content processing capabilities
- Improved system reliability
### 3. **Developer Benefits**
- Simplified AI integration
- No need to worry about content size limits
- Consistent AI behavior across the system
- Better debugging and monitoring
## Success Metrics
### 1. **Error Reduction**
- 90% reduction in "content too large" errors
- 50% reduction in AI call failures
- 95% success rate for document processing
### 2. **Performance Improvement**
- 20% faster processing for large documents
- 30% reduction in API costs
- 50% reduction in retry attempts
### 3. **User Experience**
- Faster response times
- More reliable document processing
- Better content extraction quality
## Conclusion
The new AI Engine architecture provides a robust, scalable solution for handling AI calls with large content. The migration can be done gradually with full backward compatibility, ensuring minimal risk while providing significant benefits in reliability, performance, and maintainability.

View file

@ -1,266 +0,0 @@
"""
ServiceCenter integration with Smart AI Engine
"""
import logging
from typing import List, Dict, Any, Optional
from modules.interfaces.interfaceChatModel import ChatDocument
from modules.interfaces.interfaceAiEngine import (
AIRequest, AIResponse, AIModelType, ProcessingStrategy,
ContentReductionStrategy
)
from modules.engines.aiEngine import SmartAIEngine
logger = logging.getLogger(__name__)
class ServiceCenterAIEngine:
"""ServiceCenter integration with Smart AI Engine"""
def __init__(self, service_center):
self.service_center = service_center
self.ai_engine = SmartAIEngine(service_center)
async def callAiWithDocuments(
self,
prompt: str,
documents: List[ChatDocument] = None,
context: str = None,
preferred_model: AIModelType = None,
operation_type: str = "general",
processing_strategy: ProcessingStrategy = None,
reduction_strategy: ContentReductionStrategy = None,
**kwargs
) -> str:
"""
Unified AI call method that handles documents and prompts separately
Args:
prompt: The AI prompt
documents: List of documents to process
context: Additional context
preferred_model: Preferred AI model
operation_type: Type of operation (for strategy selection)
processing_strategy: Explicit processing strategy
reduction_strategy: Explicit content reduction strategy
**kwargs: Additional parameters
Returns:
AI response content
"""
try:
# Create AI request
request = AIRequest(
prompt=prompt,
documents=documents or [],
context=context,
preferred_model=preferred_model,
processing_strategy=processing_strategy,
reduction_strategy=reduction_strategy,
metadata={
"operation_type": operation_type,
**kwargs
}
)
# Process request
response = await self.ai_engine.process_request(request)
if response.success:
return response.content
else:
raise Exception(f"AI processing failed: {response.error}")
except Exception as e:
logger.error(f"Error in AI call with documents: {str(e)}")
raise e
# Convenience methods for different operation types
async def callAiForTaskPlanning(
self,
prompt: str,
documents: List[ChatDocument] = None,
context: str = None
) -> str:
"""AI call optimized for task planning"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="task_planning",
preferred_model=AIModelType.ANTHROPIC_CLAUDE # Better for complex planning
)
async def callAiForActionDefinition(
self,
prompt: str,
documents: List[ChatDocument] = None,
context: str = None
) -> str:
"""AI call optimized for action definition"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="action_definition",
preferred_model=AIModelType.ANTHROPIC_CLAUDE
)
async def callAiForDocumentExtraction(
self,
prompt: str,
documents: List[ChatDocument],
context: str = None
) -> str:
"""AI call optimized for document extraction"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="document_extraction",
processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT
)
async def callAiForReportGeneration(
self,
prompt: str,
documents: List[ChatDocument],
context: str = None
) -> str:
"""AI call optimized for report generation"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="report_generation",
processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING
)
async def callAiForEmailComposition(
self,
prompt: str,
documents: List[ChatDocument] = None,
context: str = None
) -> str:
"""AI call optimized for email composition"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="email_composition",
preferred_model=AIModelType.OPENAI_GPT4 # Better for creative writing
)
async def callAiForChatSummarization(
self,
prompt: str,
documents: List[ChatDocument] = None,
context: str = None
) -> str:
"""AI call optimized for chat summarization"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="chat_summarization",
processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT
)
async def callAiForImageAnalysis(
self,
prompt: str,
documents: List[ChatDocument],
context: str = None
) -> str:
"""AI call optimized for image analysis"""
return await self.callAiWithDocuments(
prompt=prompt,
documents=documents,
context=context,
operation_type="image_analysis",
preferred_model=AIModelType.OPENAI_VISION,
requires_vision=True
)
# Backward compatibility methods
async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
"""Backward compatibility method"""
return await self.callAiWithDocuments(
prompt=prompt,
context=context,
operation_type="general",
preferred_model=AIModelType.ANTHROPIC_CLAUDE
)
async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
"""Backward compatibility method"""
return await self.callAiWithDocuments(
prompt=prompt,
context=context,
operation_type="general",
preferred_model=AIModelType.OPENAI_GPT35
)
async def callAiImageBasic(self, prompt: str, image_data: str, mime_type: str) -> str:
"""Backward compatibility method for image processing"""
# Create a document from image data
image_doc = self.service_center.createDocument(
"image_analysis.jpg",
mime_type,
image_data,
base64encoded=True
)
return await self.callAiForImageAnalysis(
prompt=prompt,
documents=[image_doc]
)
async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str:
"""Enhanced document extraction using AI engine"""
try:
return await self.callAiForDocumentExtraction(
prompt=prompt,
documents=[document]
)
except Exception as e:
logger.error(f"Error in enhanced document extraction: {str(e)}")
# Fall back to original method
from modules.interfaces.interfaceChatModel import ExtractedContent
extracted = await self.service_center.documentProcessor.processFileData(
fileData=self.service_center.getFileData(document.fileId),
fileName=document.fileName,
mimeType=document.mimeType,
prompt=prompt,
documentId=document.id
)
if extracted and extracted.contents:
return "\n".join([item.data for item in extracted.contents])
return ""
async def summarizeChat(self, messages: List) -> str:
"""Enhanced chat summarization using AI engine"""
try:
# Convert messages to a simple text format
chat_content = "\n".join([f"{msg.role}: {msg.message}" for msg in messages if hasattr(msg, 'message')])
# Create a document from chat content
chat_doc = self.service_center.createDocument(
"chat_history.txt",
"text/plain",
chat_content,
base64encoded=False
)
return await self.callAiForChatSummarization(
prompt="Summarize this chat conversation, focusing on key decisions, outcomes, and next steps.",
documents=[chat_doc]
)
except Exception as e:
logger.error(f"Error in enhanced chat summarization: {str(e)}")
# Fall back to original method
return await self.service_center.callAiTextBasic(
f"Summarize this chat conversation: {chat_content}"
)

View file

@ -1,544 +0,0 @@
"""
Smart AI Engine with intelligent content management and model selection
"""
import logging
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from modules.interfaces.interfaceAiEngine import (
AIEngine, AIRequest, AIResponse, AIModelType, ProcessingStrategy,
ContentReductionStrategy, ModelCapabilities, ContentReducer
)
from modules.interfaces.interfaceChatModel import ChatDocument
from modules.interfaces.interfaceAiCalls import AiCalls
from modules.chat.documents.documentExtraction import DocumentExtraction
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
class SmartAIEngine(AIEngine):
"""Smart AI Engine with automatic content management and model selection"""
def __init__(self, service_center=None):
self.service_center = service_center
self.ai_calls = AiCalls()
self.document_processor = DocumentExtraction(service_center)
self.content_reducer = SmartContentReducer(service_center)
# Model capabilities mapping
self.model_capabilities = {
AIModelType.OPENAI_GPT4: ModelCapabilities(
max_tokens=8192,
max_input_tokens=128000,
supports_vision=False,
supports_function_calling=True,
cost_per_1k_tokens=0.03,
processing_speed="medium"
),
AIModelType.OPENAI_GPT35: ModelCapabilities(
max_tokens=4096,
max_input_tokens=16384,
supports_vision=False,
supports_function_calling=True,
cost_per_1k_tokens=0.002,
processing_speed="fast"
),
AIModelType.ANTHROPIC_CLAUDE: ModelCapabilities(
max_tokens=4096,
max_input_tokens=200000,
supports_vision=False,
supports_function_calling=False,
cost_per_1k_tokens=0.015,
processing_speed="medium"
),
AIModelType.OPENAI_VISION: ModelCapabilities(
max_tokens=4096,
max_input_tokens=128000,
supports_vision=True,
supports_function_calling=False,
cost_per_1k_tokens=0.01,
processing_speed="slow"
)
}
# Processing strategy preferences
self.strategy_preferences = {
"task_planning": ProcessingStrategy.SINGLE_CALL,
"action_definition": ProcessingStrategy.SINGLE_CALL,
"document_extraction": ProcessingStrategy.DOCUMENT_BY_DOCUMENT,
"report_generation": ProcessingStrategy.CHUNKED_PROCESSING,
"email_composition": ProcessingStrategy.SINGLE_CALL,
"chat_summarization": ProcessingStrategy.SUMMARIZED_CONTENT
}
async def process_request(self, request: AIRequest) -> AIResponse:
"""Process AI request with intelligent content management"""
try:
# Step 1: Determine optimal processing strategy
strategy = self._determine_processing_strategy(request)
request.processing_strategy = strategy
# Step 2: Estimate token usage
estimated_tokens = await self.estimate_token_usage(request)
# Step 3: Select appropriate model
model = self._select_optimal_model(request, estimated_tokens)
# Step 4: Process with selected strategy
if strategy == ProcessingStrategy.SINGLE_CALL:
return await self._process_single_call(request, model)
elif strategy == ProcessingStrategy.DOCUMENT_BY_DOCUMENT:
return await self._process_document_by_document(request, model)
elif strategy == ProcessingStrategy.CHUNKED_PROCESSING:
return await self._process_chunked(request, model)
elif strategy == ProcessingStrategy.SUMMARIZED_CONTENT:
return await self._process_with_summarization(request, model)
else:
raise ValueError(f"Unknown processing strategy: {strategy}")
except Exception as e:
logger.error(f"Error processing AI request: {str(e)}")
return AIResponse(
success=False,
content="",
model_used=AIModelType.OPENAI_GPT35,
processing_strategy=ProcessingStrategy.SINGLE_CALL,
error=str(e)
)
def _determine_processing_strategy(self, request: AIRequest) -> ProcessingStrategy:
"""Determine the best processing strategy based on request characteristics"""
# Use explicit strategy if provided
if request.processing_strategy:
return request.processing_strategy
# Determine based on metadata or content characteristics
metadata = request.metadata or {}
operation_type = metadata.get("operation_type", "general")
# Check if we have a preference for this operation type
if operation_type in self.strategy_preferences:
return self.strategy_preferences[operation_type]
# Auto-determine based on content characteristics
num_documents = len(request.documents)
prompt_length = len(request.prompt)
if num_documents == 0:
return ProcessingStrategy.SINGLE_CALL
elif num_documents == 1:
return ProcessingStrategy.SINGLE_CALL
elif num_documents <= 3 and prompt_length < 1000:
return ProcessingStrategy.SINGLE_CALL
elif num_documents > 5:
return ProcessingStrategy.DOCUMENT_BY_DOCUMENT
else:
return ProcessingStrategy.CHUNKED_PROCESSING
def _select_optimal_model(self, request: AIRequest, estimated_tokens: int) -> AIModelType:
"""Select the optimal AI model based on request characteristics"""
# Use preferred model if specified and suitable
if request.preferred_model:
capabilities = self.get_model_capabilities(request.preferred_model)
if estimated_tokens <= capabilities.max_input_tokens:
return request.preferred_model
# Select model based on requirements
metadata = request.metadata or {}
requires_vision = metadata.get("requires_vision", False)
requires_function_calling = metadata.get("requires_function_calling", False)
# Filter models by requirements
suitable_models = []
for model, capabilities in self.model_capabilities.items():
if estimated_tokens <= capabilities.max_input_tokens:
if requires_vision and not capabilities.supports_vision:
continue
if requires_function_calling and not capabilities.supports_function_calling:
continue
suitable_models.append((model, capabilities))
if not suitable_models:
# If no model can handle the full content, use the one with highest capacity
best_model = max(self.model_capabilities.items(),
key=lambda x: x[1].max_input_tokens)
logger.warning(f"No model can handle {estimated_tokens} tokens, using {best_model[0]}")
return best_model[0]
# Select based on cost and speed preferences
# For now, prefer Claude for large content, GPT-4 for complex tasks, GPT-3.5 for simple tasks
if estimated_tokens > 50000:
return AIModelType.ANTHROPIC_CLAUDE
elif metadata.get("complex_task", False):
return AIModelType.OPENAI_GPT4
else:
return AIModelType.OPENAI_GPT35
async def _process_single_call(self, request: AIRequest, model: AIModelType) -> AIResponse:
"""Process request with a single AI call"""
try:
# Prepare content
content = await self._prepare_content_for_single_call(request)
# Make AI call
if model in [AIModelType.OPENAI_GPT4, AIModelType.OPENAI_GPT35]:
response = await self.ai_calls.callAiTextAdvanced(content, request.context)
elif model == AIModelType.ANTHROPIC_CLAUDE:
response = await self.ai_calls.callAiTextAdvanced(content, request.context)
else:
raise ValueError(f"Unsupported model for single call: {model}")
return AIResponse(
success=True,
content=response,
model_used=model,
processing_strategy=ProcessingStrategy.SINGLE_CALL
)
except Exception as e:
# If single call fails due to size, try with content reduction
if "too large" in str(e).lower() or "400" in str(e):
return await self._process_with_content_reduction(request, model)
else:
raise e
async def _process_document_by_document(self, request: AIRequest, model: AIModelType) -> AIResponse:
"""Process each document separately and merge results"""
try:
results = []
for i, document in enumerate(request.documents):
# Create individual request for each document
doc_request = AIRequest(
prompt=request.prompt,
documents=[document],
context=request.context,
preferred_model=model,
metadata=request.metadata
)
# Process document
doc_response = await self._process_single_call(doc_request, model)
if doc_response.success:
results.append(f"Document {i+1} ({document.fileName}):\n{doc_response.content}")
else:
results.append(f"Document {i+1} ({document.fileName}): Error - {doc_response.error}")
# Merge results
merged_content = "\n\n".join(results)
return AIResponse(
success=True,
content=merged_content,
model_used=model,
processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT
)
except Exception as e:
logger.error(f"Error in document-by-document processing: {str(e)}")
return AIResponse(
success=False,
content="",
model_used=model,
processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT,
error=str(e)
)
async def _process_chunked(self, request: AIRequest, model: AIModelType) -> AIResponse:
"""Process content in chunks and merge results"""
try:
# This would implement chunked processing logic
# For now, fall back to document-by-document
return await self._process_document_by_document(request, model)
except Exception as e:
logger.error(f"Error in chunked processing: {str(e)}")
return AIResponse(
success=False,
content="",
model_used=model,
processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING,
error=str(e)
)
async def _process_with_summarization(self, request: AIRequest, model: AIModelType) -> AIResponse:
"""Process with content summarization first"""
try:
# Summarize documents first
summarized_docs = []
for document in request.documents:
summary_doc = await self.content_reducer.summarize_document(
document,
f"Summarize this document for: {request.prompt}"
)
summarized_docs.append(summary_doc)
# Create new request with summarized documents
summary_request = AIRequest(
prompt=request.prompt,
documents=summarized_docs,
context=request.context,
preferred_model=model,
metadata=request.metadata
)
# Process with summarized content
return await self._process_single_call(summary_request, model)
except Exception as e:
logger.error(f"Error in summarization processing: {str(e)}")
return AIResponse(
success=False,
content="",
model_used=model,
processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT,
error=str(e)
)
async def _process_with_content_reduction(self, request: AIRequest, model: AIModelType) -> AIResponse:
"""Process with automatic content reduction"""
try:
# Determine reduction strategy
strategy = self._determine_reduction_strategy(request)
# Reduce content
reduced_docs, reduced_prompt = await self.content_reducer.reduce_content(
request.documents,
request.prompt,
strategy,
target_reduction=0.5
)
# Create new request with reduced content
reduced_request = AIRequest(
prompt=reduced_prompt,
documents=reduced_docs,
context=request.context,
preferred_model=model,
metadata=request.metadata
)
# Try processing with reduced content
return await self._process_single_call(reduced_request, model)
except Exception as e:
logger.error(f"Error in content reduction processing: {str(e)}")
return AIResponse(
success=False,
content="",
model_used=model,
processing_strategy=ProcessingStrategy.SINGLE_CALL,
error=f"Content reduction failed: {str(e)}"
)
def _determine_reduction_strategy(self, request: AIRequest) -> ContentReductionStrategy:
"""Determine the best content reduction strategy"""
# Use explicit strategy if provided
if request.reduction_strategy:
return request.reduction_strategy
# Auto-determine based on request characteristics
metadata = request.metadata or {}
operation_type = metadata.get("operation_type", "general")
# Different strategies for different operations
if operation_type in ["task_planning", "action_definition"]:
# For planning tasks, prompt is crucial
return ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY
elif operation_type in ["document_extraction", "report_generation"]:
# For document processing, documents are crucial
return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS
else:
# Default: reduce both
return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS
async def _prepare_content_for_single_call(self, request: AIRequest) -> str:
"""Prepare content for a single AI call"""
content_parts = [request.prompt]
if request.context:
content_parts.append(f"Context: {request.context}")
# Add document content
for i, document in enumerate(request.documents):
try:
# Extract document content
extracted = await self.service_center.extractContentFromDocument(
"Extract all relevant text content",
document
)
if extracted and extracted.contents:
doc_content = "\n".join([item.data for item in extracted.contents])
content_parts.append(f"Document {i+1} ({document.fileName}):\n{doc_content}")
else:
content_parts.append(f"Document {i+1} ({document.fileName}): [No content extracted]")
except Exception as e:
logger.warning(f"Could not extract content from document {document.fileName}: {str(e)}")
content_parts.append(f"Document {i+1} ({document.fileName}): [Error extracting content]")
return "\n\n".join(content_parts)
def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities:
"""Get capabilities for a specific model"""
return self.model_capabilities.get(model, self.model_capabilities[AIModelType.OPENAI_GPT35])
async def estimate_token_usage(self, request: AIRequest) -> int:
"""Estimate token usage for a request"""
# Simple estimation: ~4 characters per token
prompt_tokens = len(request.prompt) // 4
context_tokens = len(request.context or "") // 4
# Estimate document tokens
doc_tokens = 0
for document in request.documents:
# Rough estimate based on file size
doc_tokens += document.fileSize // 4
return prompt_tokens + context_tokens + doc_tokens
class SmartContentReducer(ContentReducer):
"""Smart content reducer using document extraction engine"""
def __init__(self, service_center):
self.service_center = service_center
self.document_processor = DocumentExtraction(service_center)
async def reduce_content(
self,
documents: List[ChatDocument],
prompt: str,
strategy: ContentReductionStrategy,
target_reduction: float = 0.5
) -> Tuple[List[ChatDocument], str]:
"""Reduce content size while preserving important information"""
reduced_docs = []
reduced_prompt = prompt
# Sort documents by size (largest first)
sorted_docs = sorted(documents, key=lambda d: d.fileSize, reverse=True)
for document in sorted_docs:
try:
# Create reduction prompt based on strategy
if strategy == ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY:
reduction_prompt = f"""
Summarize this document to {int(100 * (1 - target_reduction))}% of its original size.
Focus on the most important information relevant to: {prompt}
Preserve key facts, data, and conclusions.
"""
elif strategy == ContentReductionStrategy.SUMMARIZE_DOCUMENTS:
reduction_prompt = f"""
Create a concise summary of this document focusing on: {prompt}
Include only the most relevant information.
"""
else: # REDUCE_PROMPT_AND_DOCS or EXTRACT_KEY_INFO
reduction_prompt = f"""
Extract only the key information from this document that is relevant to: {prompt}
Be very selective and concise.
"""
# Process document with reduction
extracted = await self.service_center.extractContentFromDocument(
reduction_prompt,
document
)
if extracted and extracted.contents:
# Create new document with reduced content
reduced_content = "\n".join([item.data for item in extracted.contents])
reduced_doc = await self._create_reduced_document(document, reduced_content)
reduced_docs.append(reduced_doc)
else:
# If reduction fails, keep original document
reduced_docs.append(document)
except Exception as e:
logger.warning(f"Could not reduce document {document.fileName}: {str(e)}")
reduced_docs.append(document)
# Reduce prompt if strategy requires it
if strategy in [ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS]:
reduced_prompt = self._reduce_prompt(prompt, target_reduction)
return reduced_docs, reduced_prompt
async def summarize_document(
self,
document: ChatDocument,
focus_prompt: str
) -> ChatDocument:
"""Create a summary of a document focused on specific aspects"""
summary_prompt = f"""
Create a comprehensive summary of this document focusing on: {focus_prompt}
Include:
- Key points and main ideas
- Important data and statistics
- Conclusions and recommendations
- Any relevant details
Keep the summary concise but informative.
"""
try:
extracted = await self.service_center.extractContentFromDocument(
summary_prompt,
document
)
if extracted and extracted.contents:
summary_content = "\n".join([item.data for item in extracted.contents])
return await self._create_reduced_document(document, summary_content)
else:
return document
except Exception as e:
logger.warning(f"Could not summarize document {document.fileName}: {str(e)}")
return document
async def _create_reduced_document(self, original_doc: ChatDocument, reduced_content: str) -> ChatDocument:
"""Create a new document with reduced content"""
try:
# Create new file with reduced content
file_id = self.service_center.createFile(
f"reduced_{original_doc.fileName}",
"text/plain",
reduced_content,
base64encoded=False
)
# Create new document
return self.service_center.createDocument(
f"reduced_{original_doc.fileName}",
"text/plain",
reduced_content,
base64encoded=False,
existing_file_id=file_id
)
except Exception as e:
logger.error(f"Could not create reduced document: {str(e)}")
return original_doc
def _reduce_prompt(self, prompt: str, target_reduction: float) -> str:
"""Reduce prompt size while preserving essential information"""
# Simple prompt reduction - keep first and last parts
lines = prompt.split('\n')
if len(lines) <= 3:
return prompt
# Keep first 30% and last 20% of lines
keep_start = int(len(lines) * 0.3)
keep_end = int(len(lines) * 0.2)
reduced_lines = lines[:keep_start] + ["... (content reduced) ..."] + lines[-keep_end:]
return '\n'.join(reduced_lines)

View file

@ -2,19 +2,377 @@ import logging
from typing import Dict, Any, List, Union, Optional
from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException
from modules.connectors.connectorAiAnthropic import AiAnthropic
from modules.chat.documents.documentExtraction import DocumentExtraction
from modules.interfaces.interfaceChatModel import ChatDocument
logger = logging.getLogger(__name__)
# AI Model Registry with Performance Data
AI_MODELS = {
"openai_gpt4o": {
"connector": "openai",
"max_tokens": 128000,
"cost_per_1k_tokens": 0.03, # Input
"cost_per_1k_tokens_output": 0.06, # Output
"speed_rating": 8, # 1-10
"quality_rating": 9, # 1-10
"supports_images": True,
"supports_documents": True,
"context_length": 128000,
"model_name": "gpt-4o"
},
"openai_gpt35": {
"connector": "openai",
"max_tokens": 16000,
"cost_per_1k_tokens": 0.0015,
"cost_per_1k_tokens_output": 0.002,
"speed_rating": 9,
"quality_rating": 7,
"supports_images": False,
"supports_documents": True,
"context_length": 16000,
"model_name": "gpt-3.5-turbo"
},
"anthropic_claude": {
"connector": "anthropic",
"max_tokens": 200000,
"cost_per_1k_tokens": 0.015,
"cost_per_1k_tokens_output": 0.075,
"speed_rating": 7,
"quality_rating": 10,
"supports_images": True,
"supports_documents": True,
"context_length": 200000,
"model_name": "claude-3-sonnet-20240229"
}
}
class AiCalls:
"""Interface for AI service interactions"""
"""Interface for AI service interactions with centralized call method"""
def __init__(self):
self.openaiService = AiOpenai()
self.anthropicService = AiAnthropic()
self.document_extractor = DocumentExtraction()
async def callAi(
self,
prompt: str,
documents: List[ChatDocument] = None,
operation_type: str = "general",
priority: str = "balanced", # "speed", "quality", "cost", "balanced"
compress_prompt: bool = True,
compress_documents: bool = True,
process_documents_individually: bool = False,
max_cost: float = None,
max_processing_time: int = None
) -> str:
"""
Zentrale AI Call Methode mit intelligenter Modell-Auswahl und Content-Verarbeitung.
Args:
prompt: Der Hauptprompt für die AI
documents: Liste von Dokumenten zur Verarbeitung
operation_type: Art der Operation ("general", "document_analysis", "image_analysis", etc.)
priority: Priorität für Modell-Auswahl ("speed", "quality", "cost", "balanced")
compress_prompt: Ob der Prompt komprimiert werden soll
compress_documents: Ob Dokumente komprimiert werden sollen
process_documents_individually: Ob Dokumente einzeln verarbeitet werden sollen
max_cost: Maximale Kosten für den Call
max_processing_time: Maximale Verarbeitungszeit in Sekunden
Returns:
AI Response als String
"""
try:
# 1. Dokumente verarbeiten falls vorhanden
document_content = ""
if documents:
document_content = await self._process_documents_for_ai(
documents,
operation_type,
compress_documents,
process_documents_individually
)
# 2. Bestes Modell basierend auf Priorität und Content auswählen
selected_model = self._select_optimal_model(
prompt,
document_content,
priority,
operation_type,
max_cost,
max_processing_time
)
# 3. Content für das gewählte Modell optimieren
optimized_prompt, optimized_content = await self._optimize_content_for_model(
prompt,
document_content,
selected_model,
compress_prompt,
compress_documents
)
# 4. AI Call mit Failover ausführen
return await self._execute_ai_call_with_failover(
selected_model,
optimized_prompt,
optimized_content
)
except Exception as e:
logger.error(f"Error in centralized AI call: {str(e)}")
return f"Error: {str(e)}"
def _select_optimal_model(
self,
prompt: str,
document_content: str,
priority: str,
operation_type: str,
max_cost: float = None,
max_processing_time: int = None
) -> str:
"""Wählt das optimale Modell basierend auf Priorität und Content aus"""
# Content-Größe berechnen
total_content_size = len(prompt.encode('utf-8')) + len(document_content.encode('utf-8'))
# Verfügbare Modelle filtern
available_models = {}
for model_name, model_info in AI_MODELS.items():
# Prüfe ob Modell für Content-Größe geeignet ist
if total_content_size > model_info["context_length"] * 0.8: # 80% für Content
continue
# Prüfe Kosten-Limit
if max_cost:
estimated_cost = self._estimate_cost(model_info, total_content_size)
if estimated_cost > max_cost:
continue
# Prüfe Operation-Type Kompatibilität
if operation_type == "image_analysis" and not model_info["supports_images"]:
continue
available_models[model_name] = model_info
if not available_models:
# Fallback zum kleinsten Modell
return "openai_gpt35"
# Modell basierend auf Priorität auswählen
if priority == "speed":
return max(available_models.keys(), key=lambda x: available_models[x]["speed_rating"])
elif priority == "quality":
return max(available_models.keys(), key=lambda x: available_models[x]["quality_rating"])
elif priority == "cost":
return min(available_models.keys(), key=lambda x: available_models[x]["cost_per_1k_tokens"])
else: # balanced
# Gewichtete Bewertung: 40% Qualität, 30% Geschwindigkeit, 30% Kosten
def balanced_score(model_name):
model_info = available_models[model_name]
quality_score = model_info["quality_rating"] * 0.4
speed_score = model_info["speed_rating"] * 0.3
cost_score = (10 - (model_info["cost_per_1k_tokens"] * 1000)) * 0.3 # Niedrigere Kosten = höherer Score
return quality_score + speed_score + cost_score
return max(available_models.keys(), key=balanced_score)
def _estimate_cost(self, model_info: Dict, content_size: int) -> float:
"""Schätzt die Kosten für einen AI Call"""
# Grobe Schätzung: 1 Token ≈ 4 Zeichen
estimated_tokens = content_size / 4
input_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens"]
output_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens_output"] * 0.1 # 10% für Output
return input_cost + output_cost
async def _process_documents_for_ai(
self,
documents: List[ChatDocument],
operation_type: str,
compress_documents: bool,
process_individually: bool
) -> str:
"""Verarbeitet Dokumente für AI Call mit documentExtraction.py"""
if not documents:
return ""
processed_contents = []
for doc in documents:
try:
# Extrahiere Content mit documentExtraction.py
extracted = await self.document_extractor.processFileData(
doc.fileData,
doc.fileName,
doc.mimeType,
prompt=f"Extract relevant content for {operation_type}",
documentId=doc.id,
enableAI=True
)
# Kombiniere alle Content-Items
doc_content = []
for content_item in extracted.contents:
if content_item.data and content_item.data.strip():
doc_content.append(content_item.data)
if doc_content:
combined_doc_content = "\n\n".join(doc_content)
# Komprimiere falls gewünscht
if compress_documents and len(combined_doc_content.encode('utf-8')) > 10000: # 10KB Limit
combined_doc_content = await self._compress_content(
combined_doc_content,
10000,
"document"
)
processed_contents.append(f"Document: {doc.fileName}\n{combined_doc_content}")
except Exception as e:
logger.warning(f"Error processing document {doc.fileName}: {str(e)}")
processed_contents.append(f"Document: {doc.fileName}\n[Error processing document: {str(e)}]")
return "\n\n---\n\n".join(processed_contents)
async def _optimize_content_for_model(
self,
prompt: str,
document_content: str,
model_name: str,
compress_prompt: bool,
compress_documents: bool
) -> tuple[str, str]:
"""Optimiert Content für das gewählte Modell"""
model_info = AI_MODELS[model_name]
max_content_size = model_info["context_length"] * 0.7 # 70% für Content
optimized_prompt = prompt
optimized_content = document_content
# Prompt komprimieren falls gewünscht
if compress_prompt and len(prompt.encode('utf-8')) > 2000: # 2KB Limit für Prompt
optimized_prompt = await self._compress_content(prompt, 2000, "prompt")
# Dokument-Content komprimieren falls gewünscht
if compress_documents and document_content:
content_size = len(document_content.encode('utf-8'))
if content_size > max_content_size:
optimized_content = await self._compress_content(
document_content,
int(max_content_size),
"document"
)
return optimized_prompt, optimized_content
async def _compress_content(self, content: str, target_size: int, content_type: str) -> str:
"""Komprimiert Content intelligent basierend auf Typ"""
if len(content.encode('utf-8')) <= target_size:
return content
try:
# Verwende AI für intelligente Kompression
compression_prompt = f"""
Komprimiere den folgenden {content_type} auf maximal {target_size} Zeichen,
behalte aber alle wichtigen Informationen bei:
{content}
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
"""
# Verwende das schnellste verfügbare Modell für Kompression
compression_model = "openai_gpt35"
model_info = AI_MODELS[compression_model]
connector = getattr(self, f"{model_info['connector']}Service")
messages = [{"role": "user", "content": compression_prompt}]
if model_info["connector"] == "openai":
compressed = await connector.callAiBasic(messages)
else:
response = await connector.callAiBasic(messages)
compressed = response["choices"][0]["message"]["content"]
return compressed
except Exception as e:
logger.warning(f"AI compression failed, using truncation: {str(e)}")
# Fallback: Einfache Truncation
return content[:target_size] + "... [truncated]"
async def _execute_ai_call_with_failover(
self,
model_name: str,
prompt: str,
document_content: str
) -> str:
"""Führt AI Call mit automatischem Failover aus"""
try:
model_info = AI_MODELS[model_name]
connector = getattr(self, f"{model_info['connector']}Service")
# Messages vorbereiten
messages = []
if document_content:
messages.append({
"role": "system",
"content": f"Context from documents:\n{document_content}"
})
messages.append({
"role": "user",
"content": prompt
})
# AI Call ausführen
if model_info["connector"] == "openai":
return await connector.callAiBasic(messages)
else: # anthropic
response = await connector.callAiBasic(messages)
return response["choices"][0]["message"]["content"]
except ContextLengthExceededException:
logger.warning(f"Context length exceeded for {model_name}, trying fallback")
# Fallback zu Modell mit größerem Context
fallback_model = self._find_fallback_model(model_name)
if fallback_model:
return await self._execute_ai_call_with_failover(fallback_model, prompt, document_content)
else:
# Letzter Ausweg: Content weiter komprimieren
compressed_prompt = await self._compress_content(prompt, 1000, "prompt")
compressed_content = await self._compress_content(document_content, 5000, "document")
return await self._execute_ai_call_with_failover("openai_gpt35", compressed_prompt, compressed_content)
except Exception as e:
logger.warning(f"AI call failed with {model_name}: {e}")
# Allgemeiner Fallback
return await self._execute_ai_call_with_failover("openai_gpt35", prompt, document_content)
def _find_fallback_model(self, current_model: str) -> Optional[str]:
"""Findet ein Fallback-Modell mit größerem Context"""
current_context = AI_MODELS[current_model]["context_length"]
# Suche Modell mit größerem Context
for model_name, model_info in AI_MODELS.items():
if model_info["context_length"] > current_context:
return model_name
return None
# Legacy methods
async def callAiTextBasic(self, prompt: str, context: Optional[str] = None) -> str:
"""
Basic text processing using OpenAI.
Basic text processing - now uses centralized AI call method.
Args:
prompt: The user prompt to process
@ -23,100 +381,47 @@ class AiCalls:
Returns:
The AI response as text
"""
# Prepare messages in OpenAI format
messages = []
# Add system message if context provided
# Combine context with prompt if provided
full_prompt = prompt
if context:
messages.append({
"role": "system",
"content": context
})
full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
# Add user message
messages.append({
"role": "user",
"content": prompt
})
# Add language instruction for user-facing responses
if hasattr(self, 'userLanguage') and self.userLanguage:
ltext = f"Please respond in '{self.userLanguage}' language."
if messages and messages[0]["role"] == "system":
if "language" not in messages[0]["content"].lower():
messages[0]["content"] = f"{ltext} {messages[0]['content']}"
else:
messages.insert(0, {
"role": "system",
"content": ltext
})
try:
return await self.openaiService.callAiBasic(messages)
except ContextLengthExceededException as e:
logger.warning(f"OpenAI context length exceeded, falling back to Anthropic: {str(e)}")
# Fallback to Anthropic (AI Advanced) when context length is exceeded
return await self.callAiTextAdvanced(prompt, context, _is_fallback=True)
except Exception as e:
logger.error(f"Error in OpenAI call: {str(e)}")
return f"Error: {str(e)}"
# Use centralized AI call with speed priority for basic calls
return await self.callAi(
prompt=full_prompt,
priority="speed",
compress_prompt=True,
compress_documents=False
)
async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None, _is_fallback: bool = False) -> str:
"""
Advanced text processing using Anthropic.
Fallback to OpenAI if Anthropic is overloaded or rate-limited.
Advanced text processing - now uses centralized AI call method.
Args:
prompt: The user prompt to process
context: Optional system context/prompt
_is_fallback: Internal flag to prevent infinite recursion
_is_fallback: Internal flag (kept for compatibility)
Returns:
The AI response as text
"""
# For Anthropic, we need to handle system content differently
# Anthropic expects system content in a top-level parameter, not as a message role
try:
# Create messages without system role for Anthropic
anthropic_messages = []
if hasattr(self, 'userLanguage') and self.userLanguage:
ltext = f"Please respond in '{self.userLanguage}' language."
if context:
# Combine context and language instruction
full_context = f"{ltext}\n\n{context}"
else:
full_context = ltext
else:
full_context = context
# Add user message
anthropic_messages.append({
"role": "user",
"content": prompt
})
# Call Anthropic - let the connector handle system content conversion
if full_context:
# Send context as part of the user message for Anthropic
enhanced_prompt = f"Context:\n{full_context}\n\nUser Request:\n{prompt}"
response = await self.anthropicService.callAiBasic([
{"role": "user", "content": enhanced_prompt}
])
else:
response = await self.anthropicService.callAiBasic(anthropic_messages)
return response["choices"][0]["message"]["content"]
except Exception as e:
err_str = str(e)
logger.warning(f"[UI NOTICE] Advanced AI failed, falling back to Basic AI (OpenAI). Reason: {err_str}")
# Fallback to OpenAI basic, but only if we're not already in a fallback
if not _is_fallback:
return await self.callAiTextBasic(prompt, context)
else:
# If we're already in a fallback, return error to prevent infinite recursion
logger.error("Both AI services failed, cannot provide fallback")
return f"Error: Both AI services failed. Anthropic error: {err_str}"
# Combine context with prompt if provided
full_prompt = prompt
if context:
full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
# Use centralized AI call with quality priority for advanced calls
return await self.callAi(
prompt=full_prompt,
priority="quality",
compress_prompt=False,
compress_documents=False
)
async def callAiImageBasic(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
"""
Basic image processing using OpenAI.
Basic image processing - now uses centralized AI call method.
Args:
prompt: The prompt for image analysis
@ -127,6 +432,8 @@ class AiCalls:
The AI response as text
"""
try:
# For image processing, use the original connector directly
# as the centralized method doesn't handle images yet
return await self.openaiService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in OpenAI image call: {str(e)}")
@ -134,7 +441,7 @@ class AiCalls:
async def callAiImageAdvanced(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
"""
Advanced image processing using Anthropic.
Advanced image processing - now uses centralized AI call method.
Args:
prompt: The prompt for image analysis
@ -145,8 +452,76 @@ class AiCalls:
The AI response as text
"""
try:
# For image processing, use the original connector directly
# as the centralized method doesn't handle images yet
return await self.anthropicService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in Anthropic image call: {str(e)}")
return f"Error: {str(e)}"
# Convenience methods for common use cases
async def callAiForDocumentAnalysis(
self,
prompt: str,
documents: List[ChatDocument],
priority: str = "balanced"
) -> str:
"""Convenience method for document analysis"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="document_analysis",
priority=priority,
compress_documents=True,
process_documents_individually=False
)
async def callAiForReportGeneration(
self,
prompt: str,
documents: List[ChatDocument],
priority: str = "quality"
) -> str:
"""Convenience method for report generation"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="report_generation",
priority=priority,
compress_documents=True,
process_documents_individually=True
)
async def callAiForEmailComposition(
self,
prompt: str,
documents: List[ChatDocument] = None,
priority: str = "speed"
) -> str:
"""Convenience method for email composition"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="email_composition",
priority=priority,
compress_prompt=True,
compress_documents=True
)
async def callAiForTaskPlanning(
self,
prompt: str,
documents: List[ChatDocument] = None,
priority: str = "balanced"
) -> str:
"""Convenience method for task planning"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="task_planning",
priority=priority,
compress_prompt=False,
compress_documents=True
)

View file

@ -1,115 +0,0 @@
"""
Centralized AI Engine Interface for intelligent content processing
"""
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union, Tuple
from enum import Enum
from dataclasses import dataclass
from modules.interfaces.interfaceChatModel import ChatDocument, ExtractedContent
class AIModelType(Enum):
"""Available AI model types"""
OPENAI_GPT4 = "openai_gpt4"
OPENAI_GPT35 = "openai_gpt35"
ANTHROPIC_CLAUDE = "anthropic_claude"
OPENAI_VISION = "openai_vision"
ANTHROPIC_VISION = "anthropic_vision"
class ProcessingStrategy(Enum):
"""Content processing strategies"""
SINGLE_CALL = "single_call" # One AI call with full content
DOCUMENT_BY_DOCUMENT = "doc_by_doc" # One call per document, merge results
CHUNKED_PROCESSING = "chunked" # Process in chunks, merge results
SUMMARIZED_CONTENT = "summarized" # Summarize content first, then process
class ContentReductionStrategy(Enum):
"""Content reduction strategies"""
REDUCE_DOCUMENTS_ONLY = "reduce_docs" # Keep prompt, reduce documents
REDUCE_PROMPT_AND_DOCS = "reduce_both" # Reduce both prompt and documents
SUMMARIZE_DOCUMENTS = "summarize_docs" # Summarize documents to key points
EXTRACT_KEY_INFO = "extract_key" # Extract only relevant information
@dataclass
class AIRequest:
"""Standardized AI request structure"""
prompt: str
documents: List[ChatDocument]
context: Optional[str] = None
preferred_model: Optional[AIModelType] = None
max_tokens: Optional[int] = None
temperature: Optional[float] = None
processing_strategy: Optional[ProcessingStrategy] = None
reduction_strategy: Optional[ContentReductionStrategy] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class AIResponse:
"""Standardized AI response structure"""
success: bool
content: str
model_used: AIModelType
processing_strategy: ProcessingStrategy
tokens_used: Optional[int] = None
processing_time: Optional[float] = None
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class ModelCapabilities:
"""AI model capabilities and limits"""
max_tokens: int
max_input_tokens: int
supports_vision: bool
supports_function_calling: bool
cost_per_1k_tokens: float
processing_speed: str # "fast", "medium", "slow"
class AIEngine(ABC):
"""Abstract AI Engine interface"""
@abstractmethod
async def process_request(self, request: AIRequest) -> AIResponse:
"""Process an AI request with intelligent content management"""
pass
@abstractmethod
def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities:
"""Get capabilities and limits for a specific model"""
pass
@abstractmethod
async def estimate_token_usage(self, request: AIRequest) -> int:
"""Estimate token usage for a request"""
pass
class ContentReducer(ABC):
"""Abstract content reduction interface"""
@abstractmethod
async def reduce_content(
self,
documents: List[ChatDocument],
prompt: str,
strategy: ContentReductionStrategy,
target_reduction: float = 0.5
) -> Tuple[List[ChatDocument], str]:
"""Reduce content size while preserving important information"""
pass
@abstractmethod
async def summarize_document(
self,
document: ChatDocument,
focus_prompt: str
) -> ChatDocument:
"""Create a summary of a document focused on specific aspects"""
pass