From 8726cd4fb8302902cd8ee6088a913e31d6c5a555 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 2 Sep 2025 21:11:32 +0200
Subject: [PATCH] Centralized AI
---
AI_ENGINE_MIGRATION_PLAN.md | 276 ------------
modules/chat/serviceCenter_ai_engine.py | 266 ------------
modules/engines/aiEngine.py | 544 ------------------------
modules/interfaces/interfaceAiCalls.py | 543 +++++++++++++++++++----
modules/interfaces/interfaceAiEngine.py | 115 -----
5 files changed, 459 insertions(+), 1285 deletions(-)
delete mode 100644 AI_ENGINE_MIGRATION_PLAN.md
delete mode 100644 modules/chat/serviceCenter_ai_engine.py
delete mode 100644 modules/engines/aiEngine.py
delete mode 100644 modules/interfaces/interfaceAiEngine.py
diff --git a/AI_ENGINE_MIGRATION_PLAN.md b/AI_ENGINE_MIGRATION_PLAN.md
deleted file mode 100644
index 71077e6f..00000000
--- a/AI_ENGINE_MIGRATION_PLAN.md
+++ /dev/null
@@ -1,276 +0,0 @@
-# AI Engine Migration Plan
-
-## Overview
-This document outlines the migration strategy from the current AI call system to the new Smart AI Engine architecture.
-
-## Benefits of the New Architecture
-
-### 1. **Separation of Concerns**
-- Applications no longer need to worry about content size limits
-- Centralized AI model selection and failover
-- Intelligent content reduction strategies
-
-### 2. **Improved Reliability**
-- Automatic handling of "content too large" errors
-- Multiple fallback strategies
-- Model-specific optimization
-
-### 3. **Better Performance**
-- Optimal model selection based on content characteristics
-- Intelligent chunking and processing strategies
-- Reduced API costs through smart model selection
-
-### 4. **Enhanced Maintainability**
-- Single point of AI logic
-- Easy to add new models and strategies
-- Consistent error handling
-
-## Migration Phases
-
-### Phase 1: Infrastructure Setup (Week 1-2)
-1. **Create AI Engine Interface**
- - ✅ `interfaceAiEngine.py` - Core interfaces and data structures
- - ✅ `aiEngine.py` - Smart AI Engine implementation
- - ✅ `serviceCenter_ai_engine.py` - ServiceCenter integration
-
-2. **Update Dependencies**
- - Add new imports to existing modules
- - Update configuration for AI model selection
- - Add logging for AI engine operations
-
-### Phase 2: ServiceCenter Integration (Week 3)
-1. **Update ServiceCenter Class**
- ```python
- # Add to ServiceCenter.__init__
- self.ai_engine = ServiceCenterAIEngine(self)
-
- # Replace existing AI methods
- async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
- return await self.ai_engine.callAiTextAdvanced(prompt, context)
-
- async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
- return await self.ai_engine.callAiTextBasic(prompt, context)
-
- async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str:
- return await self.ai_engine.extractContentFromDocument(prompt, document)
-
- async def summarizeChat(self, messages: List[ChatMessage]) -> str:
- return await self.ai_engine.summarizeChat(messages)
- ```
-
-2. **Add New Document-Aware Methods**
- ```python
- async def callAiWithDocuments(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- operation_type: str = "general"
- ) -> str:
- return await self.ai_engine.callAiWithDocuments(
- prompt, documents, operation_type=operation_type
- )
- ```
-
-### Phase 3: Method Updates (Week 4-5)
-1. **Update MethodWeb.py**
- ```python
- # Before
- web_scrape_result = await web_interface.scrape(web_scrape_request)
-
- # After - no changes needed, but can be enhanced
- # The AI engine will automatically handle large content
- ```
-
-2. **Update MethodDocument.py**
- ```python
- # Before
- formatted_content = await self.service.callAiTextBasic(ai_prompt, content)
-
- # After
- formatted_content = await self.service.callAiForReportGeneration(
- prompt=ai_prompt,
- documents=chat_documents
- )
- ```
-
-3. **Update MethodAi.py**
- ```python
- # Before
- result = await self.service.callAiTextAdvanced(enhanced_prompt, context)
-
- # After
- result = await self.service.callAiWithDocuments(
- prompt=enhanced_prompt,
- documents=document_list,
- operation_type="ai_processing"
- )
- ```
-
-4. **Update MethodOutlook.py**
- ```python
- # Before
- composed_email = await self.service.interfaceAiCalls.callAiTextAdvanced(ai_prompt)
-
- # After
- composed_email = await self.service.callAiForEmailComposition(
- prompt=ai_prompt,
- documents=attached_documents
- )
- ```
-
-### Phase 4: Task Handling Updates (Week 6)
-1. **Update handlingTasks.py**
- ```python
- # Before
- prompt = await self.service.callAiTextAdvanced(task_planning_prompt)
-
- # After
- prompt = await self.service.callAiForTaskPlanning(
- prompt=task_planning_prompt,
- documents=available_documents,
- context=workflow_context
- )
- ```
-
-2. **Update promptFactory.py**
- ```python
- # Before
- messageSummary = await service.summarizeChat(context.workflow.messages)
-
- # After - no changes needed, method signature stays the same
- # But internally uses the new AI engine
- ```
-
-### Phase 5: Testing and Optimization (Week 7-8)
-1. **Unit Tests**
- - Test AI engine with various content sizes
- - Test fallback strategies
- - Test model selection logic
-
-2. **Integration Tests**
- - Test with real documents of various sizes
- - Test error scenarios
- - Test performance improvements
-
-3. **Performance Monitoring**
- - Monitor AI call success rates
- - Monitor processing times
- - Monitor cost savings
-
-## Code Changes Required
-
-### 1. ServiceCenter Updates
-```python
-# Add to ServiceCenter.__init__
-from modules.chat.serviceCenter_ai_engine import ServiceCenterAIEngine
-self.ai_engine_wrapper = ServiceCenterAIEngine(self)
-
-# Update existing methods to use AI engine
-async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
- return await self.ai_engine_wrapper.callAiTextAdvanced(prompt, context)
-
-async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
- return await self.ai_engine_wrapper.callAiTextBasic(prompt, context)
-
-async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str:
- return await self.ai_engine_wrapper.extractContentFromDocument(prompt, document)
-
-async def summarizeChat(self, messages: List[ChatMessage]) -> str:
- return await self.ai_engine_wrapper.summarizeChat(messages)
-```
-
-### 2. Method Updates (Optional Enhancements)
-```python
-# Enhanced method calls with document awareness
-async def process_documents_with_ai(self, prompt: str, documents: List[ChatDocument]):
- return await self.service.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- operation_type="document_processing"
- )
-```
-
-### 3. Configuration Updates
-```ini
-# Add to config.ini
-[AI_ENGINE]
-DEFAULT_MODEL=anthropic_claude
-FALLBACK_MODEL=openai_gpt35
-MAX_CONTENT_SIZE=100000
-ENABLE_CONTENT_REDUCTION=true
-CONTENT_REDUCTION_THRESHOLD=0.8
-```
-
-## Backward Compatibility
-
-### 1. **Method Signatures**
-- All existing method signatures remain unchanged
-- Internal implementation uses new AI engine
-- No breaking changes for existing code
-
-### 2. **Error Handling**
-- Same error types and messages
-- Enhanced error recovery with fallback strategies
-- Better error reporting with processing details
-
-### 3. **Performance**
-- Same or better performance
-- Automatic optimization based on content
-- Reduced API costs through smart model selection
-
-## Risk Mitigation
-
-### 1. **Gradual Rollout**
-- Deploy with feature flags
-- A/B testing with subset of users
-- Rollback capability
-
-### 2. **Monitoring**
-- Comprehensive logging of AI engine operations
-- Performance metrics tracking
-- Error rate monitoring
-
-### 3. **Fallback Strategy**
-- Keep original AI call methods as backup
-- Automatic fallback to original methods on errors
-- Manual override capability
-
-## Expected Benefits
-
-### 1. **Immediate Benefits**
-- Elimination of "content too large" errors
-- Better handling of large documents
-- Improved user experience
-
-### 2. **Long-term Benefits**
-- Easier addition of new AI models
-- Better cost optimization
-- Enhanced content processing capabilities
-- Improved system reliability
-
-### 3. **Developer Benefits**
-- Simplified AI integration
-- No need to worry about content size limits
-- Consistent AI behavior across the system
-- Better debugging and monitoring
-
-## Success Metrics
-
-### 1. **Error Reduction**
-- 90% reduction in "content too large" errors
-- 50% reduction in AI call failures
-- 95% success rate for document processing
-
-### 2. **Performance Improvement**
-- 20% faster processing for large documents
-- 30% reduction in API costs
-- 50% reduction in retry attempts
-
-### 3. **User Experience**
-- Faster response times
-- More reliable document processing
-- Better content extraction quality
-
-## Conclusion
-
-The new AI Engine architecture provides a robust, scalable solution for handling AI calls with large content. The migration can be done gradually with full backward compatibility, ensuring minimal risk while providing significant benefits in reliability, performance, and maintainability.
diff --git a/modules/chat/serviceCenter_ai_engine.py b/modules/chat/serviceCenter_ai_engine.py
deleted file mode 100644
index a33a5813..00000000
--- a/modules/chat/serviceCenter_ai_engine.py
+++ /dev/null
@@ -1,266 +0,0 @@
-"""
-ServiceCenter integration with Smart AI Engine
-"""
-
-import logging
-from typing import List, Dict, Any, Optional
-from modules.interfaces.interfaceChatModel import ChatDocument
-from modules.interfaces.interfaceAiEngine import (
- AIRequest, AIResponse, AIModelType, ProcessingStrategy,
- ContentReductionStrategy
-)
-from modules.engines.aiEngine import SmartAIEngine
-
-logger = logging.getLogger(__name__)
-
-
-class ServiceCenterAIEngine:
- """ServiceCenter integration with Smart AI Engine"""
-
- def __init__(self, service_center):
- self.service_center = service_center
- self.ai_engine = SmartAIEngine(service_center)
-
- async def callAiWithDocuments(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- context: str = None,
- preferred_model: AIModelType = None,
- operation_type: str = "general",
- processing_strategy: ProcessingStrategy = None,
- reduction_strategy: ContentReductionStrategy = None,
- **kwargs
- ) -> str:
- """
- Unified AI call method that handles documents and prompts separately
-
- Args:
- prompt: The AI prompt
- documents: List of documents to process
- context: Additional context
- preferred_model: Preferred AI model
- operation_type: Type of operation (for strategy selection)
- processing_strategy: Explicit processing strategy
- reduction_strategy: Explicit content reduction strategy
- **kwargs: Additional parameters
-
- Returns:
- AI response content
- """
- try:
- # Create AI request
- request = AIRequest(
- prompt=prompt,
- documents=documents or [],
- context=context,
- preferred_model=preferred_model,
- processing_strategy=processing_strategy,
- reduction_strategy=reduction_strategy,
- metadata={
- "operation_type": operation_type,
- **kwargs
- }
- )
-
- # Process request
- response = await self.ai_engine.process_request(request)
-
- if response.success:
- return response.content
- else:
- raise Exception(f"AI processing failed: {response.error}")
-
- except Exception as e:
- logger.error(f"Error in AI call with documents: {str(e)}")
- raise e
-
- # Convenience methods for different operation types
-
- async def callAiForTaskPlanning(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- context: str = None
- ) -> str:
- """AI call optimized for task planning"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="task_planning",
- preferred_model=AIModelType.ANTHROPIC_CLAUDE # Better for complex planning
- )
-
- async def callAiForActionDefinition(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- context: str = None
- ) -> str:
- """AI call optimized for action definition"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="action_definition",
- preferred_model=AIModelType.ANTHROPIC_CLAUDE
- )
-
- async def callAiForDocumentExtraction(
- self,
- prompt: str,
- documents: List[ChatDocument],
- context: str = None
- ) -> str:
- """AI call optimized for document extraction"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="document_extraction",
- processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT
- )
-
- async def callAiForReportGeneration(
- self,
- prompt: str,
- documents: List[ChatDocument],
- context: str = None
- ) -> str:
- """AI call optimized for report generation"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="report_generation",
- processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING
- )
-
- async def callAiForEmailComposition(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- context: str = None
- ) -> str:
- """AI call optimized for email composition"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="email_composition",
- preferred_model=AIModelType.OPENAI_GPT4 # Better for creative writing
- )
-
- async def callAiForChatSummarization(
- self,
- prompt: str,
- documents: List[ChatDocument] = None,
- context: str = None
- ) -> str:
- """AI call optimized for chat summarization"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="chat_summarization",
- processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT
- )
-
- async def callAiForImageAnalysis(
- self,
- prompt: str,
- documents: List[ChatDocument],
- context: str = None
- ) -> str:
- """AI call optimized for image analysis"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- documents=documents,
- context=context,
- operation_type="image_analysis",
- preferred_model=AIModelType.OPENAI_VISION,
- requires_vision=True
- )
-
- # Backward compatibility methods
-
- async def callAiTextAdvanced(self, prompt: str, context: str = None) -> str:
- """Backward compatibility method"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- context=context,
- operation_type="general",
- preferred_model=AIModelType.ANTHROPIC_CLAUDE
- )
-
- async def callAiTextBasic(self, prompt: str, context: str = None) -> str:
- """Backward compatibility method"""
- return await self.callAiWithDocuments(
- prompt=prompt,
- context=context,
- operation_type="general",
- preferred_model=AIModelType.OPENAI_GPT35
- )
-
- async def callAiImageBasic(self, prompt: str, image_data: str, mime_type: str) -> str:
- """Backward compatibility method for image processing"""
- # Create a document from image data
- image_doc = self.service_center.createDocument(
- "image_analysis.jpg",
- mime_type,
- image_data,
- base64encoded=True
- )
-
- return await self.callAiForImageAnalysis(
- prompt=prompt,
- documents=[image_doc]
- )
-
- async def extractContentFromDocument(self, prompt: str, document: ChatDocument) -> str:
- """Enhanced document extraction using AI engine"""
- try:
- return await self.callAiForDocumentExtraction(
- prompt=prompt,
- documents=[document]
- )
- except Exception as e:
- logger.error(f"Error in enhanced document extraction: {str(e)}")
- # Fall back to original method
- from modules.interfaces.interfaceChatModel import ExtractedContent
- extracted = await self.service_center.documentProcessor.processFileData(
- fileData=self.service_center.getFileData(document.fileId),
- fileName=document.fileName,
- mimeType=document.mimeType,
- prompt=prompt,
- documentId=document.id
- )
- if extracted and extracted.contents:
- return "\n".join([item.data for item in extracted.contents])
- return ""
-
- async def summarizeChat(self, messages: List) -> str:
- """Enhanced chat summarization using AI engine"""
- try:
- # Convert messages to a simple text format
- chat_content = "\n".join([f"{msg.role}: {msg.message}" for msg in messages if hasattr(msg, 'message')])
-
- # Create a document from chat content
- chat_doc = self.service_center.createDocument(
- "chat_history.txt",
- "text/plain",
- chat_content,
- base64encoded=False
- )
-
- return await self.callAiForChatSummarization(
- prompt="Summarize this chat conversation, focusing on key decisions, outcomes, and next steps.",
- documents=[chat_doc]
- )
- except Exception as e:
- logger.error(f"Error in enhanced chat summarization: {str(e)}")
- # Fall back to original method
- return await self.service_center.callAiTextBasic(
- f"Summarize this chat conversation: {chat_content}"
- )
diff --git a/modules/engines/aiEngine.py b/modules/engines/aiEngine.py
deleted file mode 100644
index f2f74c27..00000000
--- a/modules/engines/aiEngine.py
+++ /dev/null
@@ -1,544 +0,0 @@
-"""
-Smart AI Engine with intelligent content management and model selection
-"""
-
-import logging
-import asyncio
-from typing import List, Dict, Any, Optional, Tuple
-from modules.interfaces.interfaceAiEngine import (
- AIEngine, AIRequest, AIResponse, AIModelType, ProcessingStrategy,
- ContentReductionStrategy, ModelCapabilities, ContentReducer
-)
-from modules.interfaces.interfaceChatModel import ChatDocument
-from modules.interfaces.interfaceAiCalls import AiCalls
-from modules.chat.documents.documentExtraction import DocumentExtraction
-from modules.shared.configuration import APP_CONFIG
-
-logger = logging.getLogger(__name__)
-
-
-class SmartAIEngine(AIEngine):
- """Smart AI Engine with automatic content management and model selection"""
-
- def __init__(self, service_center=None):
- self.service_center = service_center
- self.ai_calls = AiCalls()
- self.document_processor = DocumentExtraction(service_center)
- self.content_reducer = SmartContentReducer(service_center)
-
- # Model capabilities mapping
- self.model_capabilities = {
- AIModelType.OPENAI_GPT4: ModelCapabilities(
- max_tokens=8192,
- max_input_tokens=128000,
- supports_vision=False,
- supports_function_calling=True,
- cost_per_1k_tokens=0.03,
- processing_speed="medium"
- ),
- AIModelType.OPENAI_GPT35: ModelCapabilities(
- max_tokens=4096,
- max_input_tokens=16384,
- supports_vision=False,
- supports_function_calling=True,
- cost_per_1k_tokens=0.002,
- processing_speed="fast"
- ),
- AIModelType.ANTHROPIC_CLAUDE: ModelCapabilities(
- max_tokens=4096,
- max_input_tokens=200000,
- supports_vision=False,
- supports_function_calling=False,
- cost_per_1k_tokens=0.015,
- processing_speed="medium"
- ),
- AIModelType.OPENAI_VISION: ModelCapabilities(
- max_tokens=4096,
- max_input_tokens=128000,
- supports_vision=True,
- supports_function_calling=False,
- cost_per_1k_tokens=0.01,
- processing_speed="slow"
- )
- }
-
- # Processing strategy preferences
- self.strategy_preferences = {
- "task_planning": ProcessingStrategy.SINGLE_CALL,
- "action_definition": ProcessingStrategy.SINGLE_CALL,
- "document_extraction": ProcessingStrategy.DOCUMENT_BY_DOCUMENT,
- "report_generation": ProcessingStrategy.CHUNKED_PROCESSING,
- "email_composition": ProcessingStrategy.SINGLE_CALL,
- "chat_summarization": ProcessingStrategy.SUMMARIZED_CONTENT
- }
-
- async def process_request(self, request: AIRequest) -> AIResponse:
- """Process AI request with intelligent content management"""
- try:
- # Step 1: Determine optimal processing strategy
- strategy = self._determine_processing_strategy(request)
- request.processing_strategy = strategy
-
- # Step 2: Estimate token usage
- estimated_tokens = await self.estimate_token_usage(request)
-
- # Step 3: Select appropriate model
- model = self._select_optimal_model(request, estimated_tokens)
-
- # Step 4: Process with selected strategy
- if strategy == ProcessingStrategy.SINGLE_CALL:
- return await self._process_single_call(request, model)
- elif strategy == ProcessingStrategy.DOCUMENT_BY_DOCUMENT:
- return await self._process_document_by_document(request, model)
- elif strategy == ProcessingStrategy.CHUNKED_PROCESSING:
- return await self._process_chunked(request, model)
- elif strategy == ProcessingStrategy.SUMMARIZED_CONTENT:
- return await self._process_with_summarization(request, model)
- else:
- raise ValueError(f"Unknown processing strategy: {strategy}")
-
- except Exception as e:
- logger.error(f"Error processing AI request: {str(e)}")
- return AIResponse(
- success=False,
- content="",
- model_used=AIModelType.OPENAI_GPT35,
- processing_strategy=ProcessingStrategy.SINGLE_CALL,
- error=str(e)
- )
-
- def _determine_processing_strategy(self, request: AIRequest) -> ProcessingStrategy:
- """Determine the best processing strategy based on request characteristics"""
-
- # Use explicit strategy if provided
- if request.processing_strategy:
- return request.processing_strategy
-
- # Determine based on metadata or content characteristics
- metadata = request.metadata or {}
- operation_type = metadata.get("operation_type", "general")
-
- # Check if we have a preference for this operation type
- if operation_type in self.strategy_preferences:
- return self.strategy_preferences[operation_type]
-
- # Auto-determine based on content characteristics
- num_documents = len(request.documents)
- prompt_length = len(request.prompt)
-
- if num_documents == 0:
- return ProcessingStrategy.SINGLE_CALL
- elif num_documents == 1:
- return ProcessingStrategy.SINGLE_CALL
- elif num_documents <= 3 and prompt_length < 1000:
- return ProcessingStrategy.SINGLE_CALL
- elif num_documents > 5:
- return ProcessingStrategy.DOCUMENT_BY_DOCUMENT
- else:
- return ProcessingStrategy.CHUNKED_PROCESSING
-
- def _select_optimal_model(self, request: AIRequest, estimated_tokens: int) -> AIModelType:
- """Select the optimal AI model based on request characteristics"""
-
- # Use preferred model if specified and suitable
- if request.preferred_model:
- capabilities = self.get_model_capabilities(request.preferred_model)
- if estimated_tokens <= capabilities.max_input_tokens:
- return request.preferred_model
-
- # Select model based on requirements
- metadata = request.metadata or {}
- requires_vision = metadata.get("requires_vision", False)
- requires_function_calling = metadata.get("requires_function_calling", False)
-
- # Filter models by requirements
- suitable_models = []
- for model, capabilities in self.model_capabilities.items():
- if estimated_tokens <= capabilities.max_input_tokens:
- if requires_vision and not capabilities.supports_vision:
- continue
- if requires_function_calling and not capabilities.supports_function_calling:
- continue
- suitable_models.append((model, capabilities))
-
- if not suitable_models:
- # If no model can handle the full content, use the one with highest capacity
- best_model = max(self.model_capabilities.items(),
- key=lambda x: x[1].max_input_tokens)
- logger.warning(f"No model can handle {estimated_tokens} tokens, using {best_model[0]}")
- return best_model[0]
-
- # Select based on cost and speed preferences
- # For now, prefer Claude for large content, GPT-4 for complex tasks, GPT-3.5 for simple tasks
- if estimated_tokens > 50000:
- return AIModelType.ANTHROPIC_CLAUDE
- elif metadata.get("complex_task", False):
- return AIModelType.OPENAI_GPT4
- else:
- return AIModelType.OPENAI_GPT35
-
- async def _process_single_call(self, request: AIRequest, model: AIModelType) -> AIResponse:
- """Process request with a single AI call"""
- try:
- # Prepare content
- content = await self._prepare_content_for_single_call(request)
-
- # Make AI call
- if model in [AIModelType.OPENAI_GPT4, AIModelType.OPENAI_GPT35]:
- response = await self.ai_calls.callAiTextAdvanced(content, request.context)
- elif model == AIModelType.ANTHROPIC_CLAUDE:
- response = await self.ai_calls.callAiTextAdvanced(content, request.context)
- else:
- raise ValueError(f"Unsupported model for single call: {model}")
-
- return AIResponse(
- success=True,
- content=response,
- model_used=model,
- processing_strategy=ProcessingStrategy.SINGLE_CALL
- )
-
- except Exception as e:
- # If single call fails due to size, try with content reduction
- if "too large" in str(e).lower() or "400" in str(e):
- return await self._process_with_content_reduction(request, model)
- else:
- raise e
-
- async def _process_document_by_document(self, request: AIRequest, model: AIModelType) -> AIResponse:
- """Process each document separately and merge results"""
- try:
- results = []
-
- for i, document in enumerate(request.documents):
- # Create individual request for each document
- doc_request = AIRequest(
- prompt=request.prompt,
- documents=[document],
- context=request.context,
- preferred_model=model,
- metadata=request.metadata
- )
-
- # Process document
- doc_response = await self._process_single_call(doc_request, model)
- if doc_response.success:
- results.append(f"Document {i+1} ({document.fileName}):\n{doc_response.content}")
- else:
- results.append(f"Document {i+1} ({document.fileName}): Error - {doc_response.error}")
-
- # Merge results
- merged_content = "\n\n".join(results)
-
- return AIResponse(
- success=True,
- content=merged_content,
- model_used=model,
- processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT
- )
-
- except Exception as e:
- logger.error(f"Error in document-by-document processing: {str(e)}")
- return AIResponse(
- success=False,
- content="",
- model_used=model,
- processing_strategy=ProcessingStrategy.DOCUMENT_BY_DOCUMENT,
- error=str(e)
- )
-
- async def _process_chunked(self, request: AIRequest, model: AIModelType) -> AIResponse:
- """Process content in chunks and merge results"""
- try:
- # This would implement chunked processing logic
- # For now, fall back to document-by-document
- return await self._process_document_by_document(request, model)
-
- except Exception as e:
- logger.error(f"Error in chunked processing: {str(e)}")
- return AIResponse(
- success=False,
- content="",
- model_used=model,
- processing_strategy=ProcessingStrategy.CHUNKED_PROCESSING,
- error=str(e)
- )
-
- async def _process_with_summarization(self, request: AIRequest, model: AIModelType) -> AIResponse:
- """Process with content summarization first"""
- try:
- # Summarize documents first
- summarized_docs = []
- for document in request.documents:
- summary_doc = await self.content_reducer.summarize_document(
- document,
- f"Summarize this document for: {request.prompt}"
- )
- summarized_docs.append(summary_doc)
-
- # Create new request with summarized documents
- summary_request = AIRequest(
- prompt=request.prompt,
- documents=summarized_docs,
- context=request.context,
- preferred_model=model,
- metadata=request.metadata
- )
-
- # Process with summarized content
- return await self._process_single_call(summary_request, model)
-
- except Exception as e:
- logger.error(f"Error in summarization processing: {str(e)}")
- return AIResponse(
- success=False,
- content="",
- model_used=model,
- processing_strategy=ProcessingStrategy.SUMMARIZED_CONTENT,
- error=str(e)
- )
-
- async def _process_with_content_reduction(self, request: AIRequest, model: AIModelType) -> AIResponse:
- """Process with automatic content reduction"""
- try:
- # Determine reduction strategy
- strategy = self._determine_reduction_strategy(request)
-
- # Reduce content
- reduced_docs, reduced_prompt = await self.content_reducer.reduce_content(
- request.documents,
- request.prompt,
- strategy,
- target_reduction=0.5
- )
-
- # Create new request with reduced content
- reduced_request = AIRequest(
- prompt=reduced_prompt,
- documents=reduced_docs,
- context=request.context,
- preferred_model=model,
- metadata=request.metadata
- )
-
- # Try processing with reduced content
- return await self._process_single_call(reduced_request, model)
-
- except Exception as e:
- logger.error(f"Error in content reduction processing: {str(e)}")
- return AIResponse(
- success=False,
- content="",
- model_used=model,
- processing_strategy=ProcessingStrategy.SINGLE_CALL,
- error=f"Content reduction failed: {str(e)}"
- )
-
- def _determine_reduction_strategy(self, request: AIRequest) -> ContentReductionStrategy:
- """Determine the best content reduction strategy"""
-
- # Use explicit strategy if provided
- if request.reduction_strategy:
- return request.reduction_strategy
-
- # Auto-determine based on request characteristics
- metadata = request.metadata or {}
- operation_type = metadata.get("operation_type", "general")
-
- # Different strategies for different operations
- if operation_type in ["task_planning", "action_definition"]:
- # For planning tasks, prompt is crucial
- return ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY
- elif operation_type in ["document_extraction", "report_generation"]:
- # For document processing, documents are crucial
- return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS
- else:
- # Default: reduce both
- return ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS
-
- async def _prepare_content_for_single_call(self, request: AIRequest) -> str:
- """Prepare content for a single AI call"""
- content_parts = [request.prompt]
-
- if request.context:
- content_parts.append(f"Context: {request.context}")
-
- # Add document content
- for i, document in enumerate(request.documents):
- try:
- # Extract document content
- extracted = await self.service_center.extractContentFromDocument(
- "Extract all relevant text content",
- document
- )
-
- if extracted and extracted.contents:
- doc_content = "\n".join([item.data for item in extracted.contents])
- content_parts.append(f"Document {i+1} ({document.fileName}):\n{doc_content}")
- else:
- content_parts.append(f"Document {i+1} ({document.fileName}): [No content extracted]")
-
- except Exception as e:
- logger.warning(f"Could not extract content from document {document.fileName}: {str(e)}")
- content_parts.append(f"Document {i+1} ({document.fileName}): [Error extracting content]")
-
- return "\n\n".join(content_parts)
-
- def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities:
- """Get capabilities for a specific model"""
- return self.model_capabilities.get(model, self.model_capabilities[AIModelType.OPENAI_GPT35])
-
- async def estimate_token_usage(self, request: AIRequest) -> int:
- """Estimate token usage for a request"""
- # Simple estimation: ~4 characters per token
- prompt_tokens = len(request.prompt) // 4
- context_tokens = len(request.context or "") // 4
-
- # Estimate document tokens
- doc_tokens = 0
- for document in request.documents:
- # Rough estimate based on file size
- doc_tokens += document.fileSize // 4
-
- return prompt_tokens + context_tokens + doc_tokens
-
-
-class SmartContentReducer(ContentReducer):
- """Smart content reducer using document extraction engine"""
-
- def __init__(self, service_center):
- self.service_center = service_center
- self.document_processor = DocumentExtraction(service_center)
-
- async def reduce_content(
- self,
- documents: List[ChatDocument],
- prompt: str,
- strategy: ContentReductionStrategy,
- target_reduction: float = 0.5
- ) -> Tuple[List[ChatDocument], str]:
- """Reduce content size while preserving important information"""
-
- reduced_docs = []
- reduced_prompt = prompt
-
- # Sort documents by size (largest first)
- sorted_docs = sorted(documents, key=lambda d: d.fileSize, reverse=True)
-
- for document in sorted_docs:
- try:
- # Create reduction prompt based on strategy
- if strategy == ContentReductionStrategy.REDUCE_DOCUMENTS_ONLY:
- reduction_prompt = f"""
- Summarize this document to {int(100 * (1 - target_reduction))}% of its original size.
- Focus on the most important information relevant to: {prompt}
- Preserve key facts, data, and conclusions.
- """
- elif strategy == ContentReductionStrategy.SUMMARIZE_DOCUMENTS:
- reduction_prompt = f"""
- Create a concise summary of this document focusing on: {prompt}
- Include only the most relevant information.
- """
- else: # REDUCE_PROMPT_AND_DOCS or EXTRACT_KEY_INFO
- reduction_prompt = f"""
- Extract only the key information from this document that is relevant to: {prompt}
- Be very selective and concise.
- """
-
- # Process document with reduction
- extracted = await self.service_center.extractContentFromDocument(
- reduction_prompt,
- document
- )
-
- if extracted and extracted.contents:
- # Create new document with reduced content
- reduced_content = "\n".join([item.data for item in extracted.contents])
- reduced_doc = await self._create_reduced_document(document, reduced_content)
- reduced_docs.append(reduced_doc)
- else:
- # If reduction fails, keep original document
- reduced_docs.append(document)
-
- except Exception as e:
- logger.warning(f"Could not reduce document {document.fileName}: {str(e)}")
- reduced_docs.append(document)
-
- # Reduce prompt if strategy requires it
- if strategy in [ContentReductionStrategy.REDUCE_PROMPT_AND_DOCS]:
- reduced_prompt = self._reduce_prompt(prompt, target_reduction)
-
- return reduced_docs, reduced_prompt
-
- async def summarize_document(
- self,
- document: ChatDocument,
- focus_prompt: str
- ) -> ChatDocument:
- """Create a summary of a document focused on specific aspects"""
-
- summary_prompt = f"""
- Create a comprehensive summary of this document focusing on: {focus_prompt}
-
- Include:
- - Key points and main ideas
- - Important data and statistics
- - Conclusions and recommendations
- - Any relevant details
-
- Keep the summary concise but informative.
- """
-
- try:
- extracted = await self.service_center.extractContentFromDocument(
- summary_prompt,
- document
- )
-
- if extracted and extracted.contents:
- summary_content = "\n".join([item.data for item in extracted.contents])
- return await self._create_reduced_document(document, summary_content)
- else:
- return document
-
- except Exception as e:
- logger.warning(f"Could not summarize document {document.fileName}: {str(e)}")
- return document
-
- async def _create_reduced_document(self, original_doc: ChatDocument, reduced_content: str) -> ChatDocument:
- """Create a new document with reduced content"""
- try:
- # Create new file with reduced content
- file_id = self.service_center.createFile(
- f"reduced_{original_doc.fileName}",
- "text/plain",
- reduced_content,
- base64encoded=False
- )
-
- # Create new document
- return self.service_center.createDocument(
- f"reduced_{original_doc.fileName}",
- "text/plain",
- reduced_content,
- base64encoded=False,
- existing_file_id=file_id
- )
-
- except Exception as e:
- logger.error(f"Could not create reduced document: {str(e)}")
- return original_doc
-
- def _reduce_prompt(self, prompt: str, target_reduction: float) -> str:
- """Reduce prompt size while preserving essential information"""
- # Simple prompt reduction - keep first and last parts
- lines = prompt.split('\n')
- if len(lines) <= 3:
- return prompt
-
- # Keep first 30% and last 20% of lines
- keep_start = int(len(lines) * 0.3)
- keep_end = int(len(lines) * 0.2)
-
- reduced_lines = lines[:keep_start] + ["... (content reduced) ..."] + lines[-keep_end:]
- return '\n'.join(reduced_lines)
diff --git a/modules/interfaces/interfaceAiCalls.py b/modules/interfaces/interfaceAiCalls.py
index fe93105f..f0bb67b4 100644
--- a/modules/interfaces/interfaceAiCalls.py
+++ b/modules/interfaces/interfaceAiCalls.py
@@ -2,19 +2,377 @@ import logging
from typing import Dict, Any, List, Union, Optional
from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException
from modules.connectors.connectorAiAnthropic import AiAnthropic
+from modules.chat.documents.documentExtraction import DocumentExtraction
+from modules.interfaces.interfaceChatModel import ChatDocument
logger = logging.getLogger(__name__)
+# AI Model Registry with Performance Data
+AI_MODELS = {
+ "openai_gpt4o": {
+ "connector": "openai",
+ "max_tokens": 128000,
+ "cost_per_1k_tokens": 0.03, # Input
+ "cost_per_1k_tokens_output": 0.06, # Output
+ "speed_rating": 8, # 1-10
+ "quality_rating": 9, # 1-10
+ "supports_images": True,
+ "supports_documents": True,
+ "context_length": 128000,
+ "model_name": "gpt-4o"
+ },
+ "openai_gpt35": {
+ "connector": "openai",
+ "max_tokens": 16000,
+ "cost_per_1k_tokens": 0.0015,
+ "cost_per_1k_tokens_output": 0.002,
+ "speed_rating": 9,
+ "quality_rating": 7,
+ "supports_images": False,
+ "supports_documents": True,
+ "context_length": 16000,
+ "model_name": "gpt-3.5-turbo"
+ },
+ "anthropic_claude": {
+ "connector": "anthropic",
+ "max_tokens": 200000,
+ "cost_per_1k_tokens": 0.015,
+ "cost_per_1k_tokens_output": 0.075,
+ "speed_rating": 7,
+ "quality_rating": 10,
+ "supports_images": True,
+ "supports_documents": True,
+ "context_length": 200000,
+ "model_name": "claude-3-sonnet-20240229"
+ }
+}
+
class AiCalls:
- """Interface for AI service interactions"""
+ """Interface for AI service interactions with centralized call method"""
def __init__(self):
self.openaiService = AiOpenai()
self.anthropicService = AiAnthropic()
+ self.document_extractor = DocumentExtraction()
+
+ async def callAi(
+ self,
+ prompt: str,
+ documents: List[ChatDocument] = None,
+ operation_type: str = "general",
+ priority: str = "balanced", # "speed", "quality", "cost", "balanced"
+ compress_prompt: bool = True,
+ compress_documents: bool = True,
+ process_documents_individually: bool = False,
+ max_cost: float = None,
+ max_processing_time: int = None
+ ) -> str:
+ """
+ Zentrale AI Call Methode mit intelligenter Modell-Auswahl und Content-Verarbeitung.
+ Args:
+ prompt: Der Hauptprompt für die AI
+ documents: Liste von Dokumenten zur Verarbeitung
+ operation_type: Art der Operation ("general", "document_analysis", "image_analysis", etc.)
+ priority: Priorität für Modell-Auswahl ("speed", "quality", "cost", "balanced")
+ compress_prompt: Ob der Prompt komprimiert werden soll
+ compress_documents: Ob Dokumente komprimiert werden sollen
+ process_documents_individually: Ob Dokumente einzeln verarbeitet werden sollen
+ max_cost: Maximale Kosten für den Call
+ max_processing_time: Maximale Verarbeitungszeit in Sekunden
+
+ Returns:
+ AI Response als String
+ """
+ try:
+ # 1. Dokumente verarbeiten falls vorhanden
+ document_content = ""
+ if documents:
+ document_content = await self._process_documents_for_ai(
+ documents,
+ operation_type,
+ compress_documents,
+ process_documents_individually
+ )
+
+ # 2. Bestes Modell basierend auf Priorität und Content auswählen
+ selected_model = self._select_optimal_model(
+ prompt,
+ document_content,
+ priority,
+ operation_type,
+ max_cost,
+ max_processing_time
+ )
+
+ # 3. Content für das gewählte Modell optimieren
+ optimized_prompt, optimized_content = await self._optimize_content_for_model(
+ prompt,
+ document_content,
+ selected_model,
+ compress_prompt,
+ compress_documents
+ )
+
+ # 4. AI Call mit Failover ausführen
+ return await self._execute_ai_call_with_failover(
+ selected_model,
+ optimized_prompt,
+ optimized_content
+ )
+
+ except Exception as e:
+ logger.error(f"Error in centralized AI call: {str(e)}")
+ return f"Error: {str(e)}"
+
+ def _select_optimal_model(
+ self,
+ prompt: str,
+ document_content: str,
+ priority: str,
+ operation_type: str,
+ max_cost: float = None,
+ max_processing_time: int = None
+ ) -> str:
+ """Wählt das optimale Modell basierend auf Priorität und Content aus"""
+
+ # Content-Größe berechnen
+ total_content_size = len(prompt.encode('utf-8')) + len(document_content.encode('utf-8'))
+
+ # Verfügbare Modelle filtern
+ available_models = {}
+ for model_name, model_info in AI_MODELS.items():
+ # Prüfe ob Modell für Content-Größe geeignet ist
+ if total_content_size > model_info["context_length"] * 0.8: # 80% für Content
+ continue
+
+ # Prüfe Kosten-Limit
+ if max_cost:
+ estimated_cost = self._estimate_cost(model_info, total_content_size)
+ if estimated_cost > max_cost:
+ continue
+
+ # Prüfe Operation-Type Kompatibilität
+ if operation_type == "image_analysis" and not model_info["supports_images"]:
+ continue
+
+ available_models[model_name] = model_info
+
+ if not available_models:
+ # Fallback zum kleinsten Modell
+ return "openai_gpt35"
+
+ # Modell basierend auf Priorität auswählen
+ if priority == "speed":
+ return max(available_models.keys(), key=lambda x: available_models[x]["speed_rating"])
+ elif priority == "quality":
+ return max(available_models.keys(), key=lambda x: available_models[x]["quality_rating"])
+ elif priority == "cost":
+ return min(available_models.keys(), key=lambda x: available_models[x]["cost_per_1k_tokens"])
+ else: # balanced
+ # Gewichtete Bewertung: 40% Qualität, 30% Geschwindigkeit, 30% Kosten
+ def balanced_score(model_name):
+ model_info = available_models[model_name]
+ quality_score = model_info["quality_rating"] * 0.4
+ speed_score = model_info["speed_rating"] * 0.3
+ cost_score = (10 - (model_info["cost_per_1k_tokens"] * 1000)) * 0.3 # Niedrigere Kosten = höherer Score
+ return quality_score + speed_score + cost_score
+
+ return max(available_models.keys(), key=balanced_score)
+
+ def _estimate_cost(self, model_info: Dict, content_size: int) -> float:
+ """Schätzt die Kosten für einen AI Call"""
+ # Grobe Schätzung: 1 Token ≈ 4 Zeichen
+ estimated_tokens = content_size / 4
+ input_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens"]
+ output_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens_output"] * 0.1 # 10% für Output
+ return input_cost + output_cost
+
+ async def _process_documents_for_ai(
+ self,
+ documents: List[ChatDocument],
+ operation_type: str,
+ compress_documents: bool,
+ process_individually: bool
+ ) -> str:
+ """Verarbeitet Dokumente für AI Call mit documentExtraction.py"""
+
+ if not documents:
+ return ""
+
+ processed_contents = []
+
+ for doc in documents:
+ try:
+ # Extrahiere Content mit documentExtraction.py
+ extracted = await self.document_extractor.processFileData(
+ doc.fileData,
+ doc.fileName,
+ doc.mimeType,
+ prompt=f"Extract relevant content for {operation_type}",
+ documentId=doc.id,
+ enableAI=True
+ )
+
+ # Kombiniere alle Content-Items
+ doc_content = []
+ for content_item in extracted.contents:
+ if content_item.data and content_item.data.strip():
+ doc_content.append(content_item.data)
+
+ if doc_content:
+ combined_doc_content = "\n\n".join(doc_content)
+
+ # Komprimiere falls gewünscht
+ if compress_documents and len(combined_doc_content.encode('utf-8')) > 10000: # 10KB Limit
+ combined_doc_content = await self._compress_content(
+ combined_doc_content,
+ 10000,
+ "document"
+ )
+
+ processed_contents.append(f"Document: {doc.fileName}\n{combined_doc_content}")
+
+ except Exception as e:
+ logger.warning(f"Error processing document {doc.fileName}: {str(e)}")
+ processed_contents.append(f"Document: {doc.fileName}\n[Error processing document: {str(e)}]")
+
+ return "\n\n---\n\n".join(processed_contents)
+
+ async def _optimize_content_for_model(
+ self,
+ prompt: str,
+ document_content: str,
+ model_name: str,
+ compress_prompt: bool,
+ compress_documents: bool
+ ) -> tuple[str, str]:
+ """Optimiert Content für das gewählte Modell"""
+
+ model_info = AI_MODELS[model_name]
+ max_content_size = model_info["context_length"] * 0.7 # 70% für Content
+
+ optimized_prompt = prompt
+ optimized_content = document_content
+
+ # Prompt komprimieren falls gewünscht
+ if compress_prompt and len(prompt.encode('utf-8')) > 2000: # 2KB Limit für Prompt
+ optimized_prompt = await self._compress_content(prompt, 2000, "prompt")
+
+ # Dokument-Content komprimieren falls gewünscht
+ if compress_documents and document_content:
+ content_size = len(document_content.encode('utf-8'))
+ if content_size > max_content_size:
+ optimized_content = await self._compress_content(
+ document_content,
+ int(max_content_size),
+ "document"
+ )
+
+ return optimized_prompt, optimized_content
+
+ async def _compress_content(self, content: str, target_size: int, content_type: str) -> str:
+ """Komprimiert Content intelligent basierend auf Typ"""
+
+ if len(content.encode('utf-8')) <= target_size:
+ return content
+
+ try:
+ # Verwende AI für intelligente Kompression
+ compression_prompt = f"""
+ Komprimiere den folgenden {content_type} auf maximal {target_size} Zeichen,
+ behalte aber alle wichtigen Informationen bei:
+
+ {content}
+
+ Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
+ """
+
+ # Verwende das schnellste verfügbare Modell für Kompression
+ compression_model = "openai_gpt35"
+ model_info = AI_MODELS[compression_model]
+ connector = getattr(self, f"{model_info['connector']}Service")
+
+ messages = [{"role": "user", "content": compression_prompt}]
+
+ if model_info["connector"] == "openai":
+ compressed = await connector.callAiBasic(messages)
+ else:
+ response = await connector.callAiBasic(messages)
+ compressed = response["choices"][0]["message"]["content"]
+
+ return compressed
+
+ except Exception as e:
+ logger.warning(f"AI compression failed, using truncation: {str(e)}")
+ # Fallback: Einfache Truncation
+ return content[:target_size] + "... [truncated]"
+
+ async def _execute_ai_call_with_failover(
+ self,
+ model_name: str,
+ prompt: str,
+ document_content: str
+ ) -> str:
+ """Führt AI Call mit automatischem Failover aus"""
+
+ try:
+ model_info = AI_MODELS[model_name]
+ connector = getattr(self, f"{model_info['connector']}Service")
+
+ # Messages vorbereiten
+ messages = []
+ if document_content:
+ messages.append({
+ "role": "system",
+ "content": f"Context from documents:\n{document_content}"
+ })
+
+ messages.append({
+ "role": "user",
+ "content": prompt
+ })
+
+ # AI Call ausführen
+ if model_info["connector"] == "openai":
+ return await connector.callAiBasic(messages)
+ else: # anthropic
+ response = await connector.callAiBasic(messages)
+ return response["choices"][0]["message"]["content"]
+
+ except ContextLengthExceededException:
+ logger.warning(f"Context length exceeded for {model_name}, trying fallback")
+ # Fallback zu Modell mit größerem Context
+ fallback_model = self._find_fallback_model(model_name)
+ if fallback_model:
+ return await self._execute_ai_call_with_failover(fallback_model, prompt, document_content)
+ else:
+ # Letzter Ausweg: Content weiter komprimieren
+ compressed_prompt = await self._compress_content(prompt, 1000, "prompt")
+ compressed_content = await self._compress_content(document_content, 5000, "document")
+ return await self._execute_ai_call_with_failover("openai_gpt35", compressed_prompt, compressed_content)
+
+ except Exception as e:
+ logger.warning(f"AI call failed with {model_name}: {e}")
+ # Allgemeiner Fallback
+ return await self._execute_ai_call_with_failover("openai_gpt35", prompt, document_content)
+
+ def _find_fallback_model(self, current_model: str) -> Optional[str]:
+ """Findet ein Fallback-Modell mit größerem Context"""
+ current_context = AI_MODELS[current_model]["context_length"]
+
+ # Suche Modell mit größerem Context
+ for model_name, model_info in AI_MODELS.items():
+ if model_info["context_length"] > current_context:
+ return model_name
+
+ return None
+
+ # Legacy methods
+
async def callAiTextBasic(self, prompt: str, context: Optional[str] = None) -> str:
"""
- Basic text processing using OpenAI.
+ Basic text processing - now uses centralized AI call method.
Args:
prompt: The user prompt to process
@@ -23,100 +381,47 @@ class AiCalls:
Returns:
The AI response as text
"""
- # Prepare messages in OpenAI format
- messages = []
-
- # Add system message if context provided
+ # Combine context with prompt if provided
+ full_prompt = prompt
if context:
- messages.append({
- "role": "system",
- "content": context
- })
+ full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
- # Add user message
- messages.append({
- "role": "user",
- "content": prompt
- })
-
- # Add language instruction for user-facing responses
- if hasattr(self, 'userLanguage') and self.userLanguage:
- ltext = f"Please respond in '{self.userLanguage}' language."
- if messages and messages[0]["role"] == "system":
- if "language" not in messages[0]["content"].lower():
- messages[0]["content"] = f"{ltext} {messages[0]['content']}"
- else:
- messages.insert(0, {
- "role": "system",
- "content": ltext
- })
-
- try:
- return await self.openaiService.callAiBasic(messages)
- except ContextLengthExceededException as e:
- logger.warning(f"OpenAI context length exceeded, falling back to Anthropic: {str(e)}")
- # Fallback to Anthropic (AI Advanced) when context length is exceeded
- return await self.callAiTextAdvanced(prompt, context, _is_fallback=True)
- except Exception as e:
- logger.error(f"Error in OpenAI call: {str(e)}")
- return f"Error: {str(e)}"
+ # Use centralized AI call with speed priority for basic calls
+ return await self.callAi(
+ prompt=full_prompt,
+ priority="speed",
+ compress_prompt=True,
+ compress_documents=False
+ )
async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None, _is_fallback: bool = False) -> str:
"""
- Advanced text processing using Anthropic.
- Fallback to OpenAI if Anthropic is overloaded or rate-limited.
+ Advanced text processing - now uses centralized AI call method.
Args:
prompt: The user prompt to process
context: Optional system context/prompt
- _is_fallback: Internal flag to prevent infinite recursion
+ _is_fallback: Internal flag (kept for compatibility)
+
+ Returns:
+ The AI response as text
"""
- # For Anthropic, we need to handle system content differently
- # Anthropic expects system content in a top-level parameter, not as a message role
- try:
- # Create messages without system role for Anthropic
- anthropic_messages = []
- if hasattr(self, 'userLanguage') and self.userLanguage:
- ltext = f"Please respond in '{self.userLanguage}' language."
- if context:
- # Combine context and language instruction
- full_context = f"{ltext}\n\n{context}"
- else:
- full_context = ltext
- else:
- full_context = context
-
- # Add user message
- anthropic_messages.append({
- "role": "user",
- "content": prompt
- })
-
- # Call Anthropic - let the connector handle system content conversion
- if full_context:
- # Send context as part of the user message for Anthropic
- enhanced_prompt = f"Context:\n{full_context}\n\nUser Request:\n{prompt}"
- response = await self.anthropicService.callAiBasic([
- {"role": "user", "content": enhanced_prompt}
- ])
- else:
- response = await self.anthropicService.callAiBasic(anthropic_messages)
-
- return response["choices"][0]["message"]["content"]
- except Exception as e:
- err_str = str(e)
- logger.warning(f"[UI NOTICE] Advanced AI failed, falling back to Basic AI (OpenAI). Reason: {err_str}")
- # Fallback to OpenAI basic, but only if we're not already in a fallback
- if not _is_fallback:
- return await self.callAiTextBasic(prompt, context)
- else:
- # If we're already in a fallback, return error to prevent infinite recursion
- logger.error("Both AI services failed, cannot provide fallback")
- return f"Error: Both AI services failed. Anthropic error: {err_str}"
+ # Combine context with prompt if provided
+ full_prompt = prompt
+ if context:
+ full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
+
+ # Use centralized AI call with quality priority for advanced calls
+ return await self.callAi(
+ prompt=full_prompt,
+ priority="quality",
+ compress_prompt=False,
+ compress_documents=False
+ )
async def callAiImageBasic(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
"""
- Basic image processing using OpenAI.
+ Basic image processing - now uses centralized AI call method.
Args:
prompt: The prompt for image analysis
@@ -127,6 +432,8 @@ class AiCalls:
The AI response as text
"""
try:
+ # For image processing, use the original connector directly
+ # as the centralized method doesn't handle images yet
return await self.openaiService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in OpenAI image call: {str(e)}")
@@ -134,7 +441,7 @@ class AiCalls:
async def callAiImageAdvanced(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
"""
- Advanced image processing using Anthropic.
+ Advanced image processing - now uses centralized AI call method.
Args:
prompt: The prompt for image analysis
@@ -145,8 +452,76 @@ class AiCalls:
The AI response as text
"""
try:
+ # For image processing, use the original connector directly
+ # as the centralized method doesn't handle images yet
return await self.anthropicService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in Anthropic image call: {str(e)}")
return f"Error: {str(e)}"
+
+ # Convenience methods for common use cases
+
+ async def callAiForDocumentAnalysis(
+ self,
+ prompt: str,
+ documents: List[ChatDocument],
+ priority: str = "balanced"
+ ) -> str:
+ """Convenience method for document analysis"""
+ return await self.callAi(
+ prompt=prompt,
+ documents=documents,
+ operation_type="document_analysis",
+ priority=priority,
+ compress_documents=True,
+ process_documents_individually=False
+ )
+
+ async def callAiForReportGeneration(
+ self,
+ prompt: str,
+ documents: List[ChatDocument],
+ priority: str = "quality"
+ ) -> str:
+ """Convenience method for report generation"""
+ return await self.callAi(
+ prompt=prompt,
+ documents=documents,
+ operation_type="report_generation",
+ priority=priority,
+ compress_documents=True,
+ process_documents_individually=True
+ )
+
+ async def callAiForEmailComposition(
+ self,
+ prompt: str,
+ documents: List[ChatDocument] = None,
+ priority: str = "speed"
+ ) -> str:
+ """Convenience method for email composition"""
+ return await self.callAi(
+ prompt=prompt,
+ documents=documents,
+ operation_type="email_composition",
+ priority=priority,
+ compress_prompt=True,
+ compress_documents=True
+ )
+
+ async def callAiForTaskPlanning(
+ self,
+ prompt: str,
+ documents: List[ChatDocument] = None,
+ priority: str = "balanced"
+ ) -> str:
+ """Convenience method for task planning"""
+ return await self.callAi(
+ prompt=prompt,
+ documents=documents,
+ operation_type="task_planning",
+ priority=priority,
+ compress_prompt=False,
+ compress_documents=True
+ )
diff --git a/modules/interfaces/interfaceAiEngine.py b/modules/interfaces/interfaceAiEngine.py
deleted file mode 100644
index 2ccdd70e..00000000
--- a/modules/interfaces/interfaceAiEngine.py
+++ /dev/null
@@ -1,115 +0,0 @@
-"""
-Centralized AI Engine Interface for intelligent content processing
-"""
-
-from abc import ABC, abstractmethod
-from typing import List, Dict, Any, Optional, Union, Tuple
-from enum import Enum
-from dataclasses import dataclass
-from modules.interfaces.interfaceChatModel import ChatDocument, ExtractedContent
-
-
-class AIModelType(Enum):
- """Available AI model types"""
- OPENAI_GPT4 = "openai_gpt4"
- OPENAI_GPT35 = "openai_gpt35"
- ANTHROPIC_CLAUDE = "anthropic_claude"
- OPENAI_VISION = "openai_vision"
- ANTHROPIC_VISION = "anthropic_vision"
-
-
-class ProcessingStrategy(Enum):
- """Content processing strategies"""
- SINGLE_CALL = "single_call" # One AI call with full content
- DOCUMENT_BY_DOCUMENT = "doc_by_doc" # One call per document, merge results
- CHUNKED_PROCESSING = "chunked" # Process in chunks, merge results
- SUMMARIZED_CONTENT = "summarized" # Summarize content first, then process
-
-
-class ContentReductionStrategy(Enum):
- """Content reduction strategies"""
- REDUCE_DOCUMENTS_ONLY = "reduce_docs" # Keep prompt, reduce documents
- REDUCE_PROMPT_AND_DOCS = "reduce_both" # Reduce both prompt and documents
- SUMMARIZE_DOCUMENTS = "summarize_docs" # Summarize documents to key points
- EXTRACT_KEY_INFO = "extract_key" # Extract only relevant information
-
-
-@dataclass
-class AIRequest:
- """Standardized AI request structure"""
- prompt: str
- documents: List[ChatDocument]
- context: Optional[str] = None
- preferred_model: Optional[AIModelType] = None
- max_tokens: Optional[int] = None
- temperature: Optional[float] = None
- processing_strategy: Optional[ProcessingStrategy] = None
- reduction_strategy: Optional[ContentReductionStrategy] = None
- metadata: Optional[Dict[str, Any]] = None
-
-
-@dataclass
-class AIResponse:
- """Standardized AI response structure"""
- success: bool
- content: str
- model_used: AIModelType
- processing_strategy: ProcessingStrategy
- tokens_used: Optional[int] = None
- processing_time: Optional[float] = None
- error: Optional[str] = None
- metadata: Optional[Dict[str, Any]] = None
-
-
-@dataclass
-class ModelCapabilities:
- """AI model capabilities and limits"""
- max_tokens: int
- max_input_tokens: int
- supports_vision: bool
- supports_function_calling: bool
- cost_per_1k_tokens: float
- processing_speed: str # "fast", "medium", "slow"
-
-
-class AIEngine(ABC):
- """Abstract AI Engine interface"""
-
- @abstractmethod
- async def process_request(self, request: AIRequest) -> AIResponse:
- """Process an AI request with intelligent content management"""
- pass
-
- @abstractmethod
- def get_model_capabilities(self, model: AIModelType) -> ModelCapabilities:
- """Get capabilities and limits for a specific model"""
- pass
-
- @abstractmethod
- async def estimate_token_usage(self, request: AIRequest) -> int:
- """Estimate token usage for a request"""
- pass
-
-
-class ContentReducer(ABC):
- """Abstract content reduction interface"""
-
- @abstractmethod
- async def reduce_content(
- self,
- documents: List[ChatDocument],
- prompt: str,
- strategy: ContentReductionStrategy,
- target_reduction: float = 0.5
- ) -> Tuple[List[ChatDocument], str]:
- """Reduce content size while preserving important information"""
- pass
-
- @abstractmethod
- async def summarize_document(
- self,
- document: ChatDocument,
- focus_prompt: str
- ) -> ChatDocument:
- """Create a summary of a document focused on specific aspects"""
- pass