gateway/modules/interfaces/interfaceAiObjects.py
2025-09-26 23:36:56 +02:00

513 lines
20 KiB
Python

import logging
from typing import Dict, Any, List, Union
from dataclasses import dataclass
from modules.connectors.connectorAiOpenai import AiOpenai
from modules.connectors.connectorAiAnthropic import AiAnthropic
from modules.connectors.connectorAiLangdoc import AiLangdoc
from modules.connectors.connectorAiTavily import ConnectorWeb
from modules.datamodels.datamodelAi import (
AiCallOptions,
AiCallRequest,
AiCallResponse,
OperationType,
ProcessingMode,
Priority,
ModelTags,
OPERATION_TAG_MAPPING,
PROCESSING_MODE_PRIORITY_MAPPING
)
from modules.datamodels.datamodelWeb import (
WebCrawlActionResult,
WebCrawlActionDocument,
WebCrawlDocumentData,
WebCrawlRequest,
WebCrawlResultItem,
WebScrapeActionResult,
WebScrapeActionDocument,
WebSearchDocumentData as WebScrapeDocumentData,
WebScrapeRequest,
WebScrapeResultItem,
WebSearchActionResult,
WebSearchActionDocument,
WebSearchDocumentData,
WebSearchRequest,
WebSearchResultItem,
)
from modules.datamodels.datamodelWorkflow import ActionDocument
logger = logging.getLogger(__name__)
# Comprehensive model registry with capability tags and function mapping
aiModels: Dict[str, Dict[str, Any]] = {
# OpenAI Models
"openai_callAiBasic": {
"connector": "openai",
"function": "callAiBasic",
"llmName": "gpt-4o",
"contextLength": 128000,
"costPer1kTokens": 0.03,
"costPer1kTokensOutput": 0.06,
"speedRating": 8,
"qualityRating": 9,
"capabilities": ["text_generation", "chat", "reasoning"],
"tags": ["text", "chat", "reasoning", "general"]
},
"openai_callAiBasic_gpt35": {
"connector": "openai",
"function": "callAiBasic",
"llmName": "gpt-3.5-turbo",
"contextLength": 16000,
"costPer1kTokens": 0.0015,
"costPer1kTokensOutput": 0.002,
"speedRating": 9,
"qualityRating": 7,
"capabilities": ["text_generation", "chat", "reasoning"],
"tags": ["text", "chat", "reasoning", "general", "fast"]
},
"openai_callAiImage": {
"connector": "openai",
"function": "callAiImage",
"llmName": "gpt-4o",
"contextLength": 128000,
"costPer1kTokens": 0.03,
"costPer1kTokensOutput": 0.06,
"speedRating": 7,
"qualityRating": 9,
"capabilities": ["image_analysis", "vision", "multimodal"],
"tags": ["image", "vision", "multimodal"]
},
"openai_generateImage": {
"connector": "openai",
"function": "generateImage",
"llmName": "dall-e-3",
"contextLength": 0,
"costPer1kTokens": 0.04,
"costPer1kTokensOutput": 0.0,
"speedRating": 6,
"qualityRating": 9,
"capabilities": ["image_generation", "art", "visual_creation"],
"tags": ["image_generation", "art", "visual"]
},
# Anthropic Models
"anthropic_callAiBasic": {
"connector": "anthropic",
"function": "callAiBasic",
"llmName": "claude-3-5-sonnet-20241022",
"contextLength": 200000,
"costPer1kTokens": 0.015,
"costPer1kTokensOutput": 0.075,
"speedRating": 7,
"qualityRating": 10,
"capabilities": ["text_generation", "chat", "reasoning", "analysis"],
"tags": ["text", "chat", "reasoning", "analysis", "high_quality"]
},
"anthropic_callAiImage": {
"connector": "anthropic",
"function": "callAiImage",
"llmName": "claude-3-5-sonnet-20241022",
"contextLength": 200000,
"costPer1kTokens": 0.015,
"costPer1kTokensOutput": 0.075,
"speedRating": 7,
"qualityRating": 10,
"capabilities": ["image_analysis", "vision", "multimodal"],
"tags": ["image", "vision", "multimodal", "high_quality"]
},
# LangDoc Models
"langdoc_callAiBasic": {
"connector": "langdoc",
"function": "callAiBasic",
"llmName": "gpt-4o",
"contextLength": 128000,
"costPer1kTokens": 0.02,
"costPer1kTokensOutput": 0.04,
"speedRating": 8,
"qualityRating": 9,
"capabilities": ["text_generation", "chat", "reasoning"],
"tags": ["text", "chat", "reasoning", "general", "cost_effective"]
},
"langdoc_callAiImage": {
"connector": "langdoc",
"function": "callAiImage",
"llmName": "gpt-4o",
"contextLength": 128000,
"costPer1kTokens": 0.02,
"costPer1kTokensOutput": 0.04,
"speedRating": 7,
"qualityRating": 9,
"capabilities": ["image_analysis", "vision", "multimodal"],
"tags": ["image", "vision", "multimodal", "cost_effective"]
},
"langdoc_generateImage": {
"connector": "langdoc",
"function": "generateImage",
"llmName": "dall-e-3",
"contextLength": 0,
"costPer1kTokens": 0.04,
"costPer1kTokensOutput": 0.0,
"speedRating": 6,
"qualityRating": 9,
"capabilities": ["image_generation", "art", "visual_creation"],
"tags": ["image_generation", "art", "visual", "cost_effective"]
},
"langdoc_generateImageWithVariations": {
"connector": "langdoc",
"function": "generateImageWithVariations",
"llmName": "dall-e-3",
"contextLength": 0,
"costPer1kTokens": 0.04,
"costPer1kTokensOutput": 0.0,
"speedRating": 5,
"qualityRating": 9,
"capabilities": ["image_generation", "art", "visual_creation", "variations"],
"tags": ["image_generation", "art", "visual", "variations", "cost_effective"]
},
"langdoc_generateImageWithChat": {
"connector": "langdoc",
"function": "generateImageWithChat",
"llmName": "gpt-4o",
"contextLength": 128000,
"costPer1kTokens": 0.02,
"costPer1kTokensOutput": 0.04,
"speedRating": 6,
"qualityRating": 8,
"capabilities": ["image_generation", "chat", "visual_creation"],
"tags": ["image_generation", "chat", "visual", "cost_effective"]
},
"langdoc_listModels": {
"connector": "langdoc",
"function": "listModels",
"llmName": "api",
"contextLength": 0,
"costPer1kTokens": 0.0,
"costPer1kTokensOutput": 0.0,
"speedRating": 9,
"qualityRating": 5,
"capabilities": ["model_listing", "api_info"],
"tags": ["api", "info", "models"]
},
"langdoc_getModelInfo": {
"connector": "langdoc",
"function": "getModelInfo",
"llmName": "api",
"contextLength": 0,
"costPer1kTokens": 0.0,
"costPer1kTokensOutput": 0.0,
"speedRating": 9,
"qualityRating": 5,
"capabilities": ["model_info", "api_info"],
"tags": ["api", "info", "models"]
},
# Tavily Web Models
"tavily_search": {
"connector": "tavily",
"function": "search",
"llmName": "tavily-search",
"contextLength": 0,
"costPer1kTokens": 0.0,
"costPer1kTokensOutput": 0.0,
"speedRating": 8,
"qualityRating": 8,
"capabilities": ["web_search", "information_retrieval", "url_discovery"],
"tags": ["web", "search", "urls", "information"]
},
"tavily_crawl": {
"connector": "tavily",
"function": "crawl",
"llmName": "tavily-extract",
"contextLength": 0,
"costPer1kTokens": 0.0,
"costPer1kTokensOutput": 0.0,
"speedRating": 6,
"qualityRating": 8,
"capabilities": ["web_crawling", "content_extraction", "text_extraction"],
"tags": ["web", "crawl", "extract", "content"]
},
"tavily_scrape": {
"connector": "tavily",
"function": "scrape",
"llmName": "tavily-search-extract",
"contextLength": 0,
"costPer1kTokens": 0.0,
"costPer1kTokensOutput": 0.0,
"speedRating": 6,
"qualityRating": 8,
"capabilities": ["web_search", "web_crawling", "content_extraction", "information_retrieval"],
"tags": ["web", "search", "crawl", "extract", "content", "information"]
}
}
@dataclass(slots=True)
class AiObjects:
"""Centralized AI interface: selects model and calls connector. Includes web functionality."""
openaiService: AiOpenai
anthropicService: AiAnthropic
langdocService: AiLangdoc
tavilyService: ConnectorWeb
def __post_init__(self) -> None:
if self.openaiService is None:
raise TypeError("openaiService must be provided")
if self.anthropicService is None:
raise TypeError("anthropicService must be provided")
if self.langdocService is None:
raise TypeError("langdocService must be provided")
if self.tavilyService is None:
raise TypeError("tavilyService must be provided")
@classmethod
async def create(cls) -> "AiObjects":
"""Create AiObjects instance with all connectors initialized."""
openaiService = AiOpenai()
anthropicService = AiAnthropic()
langdocService = AiLangdoc()
tavilyService = await ConnectorWeb.create()
return cls(
openaiService=openaiService,
anthropicService=anthropicService,
langdocService=langdocService,
tavilyService=tavilyService
)
def _estimateCost(self, modelInfo: Dict[str, Any], contentSize: int) -> float:
estimatedTokens = contentSize / 4
inputCost = (estimatedTokens / 1000) * modelInfo["costPer1kTokens"]
outputCost = (estimatedTokens / 1000) * modelInfo["costPer1kTokensOutput"] * 0.1
return inputCost + outputCost
def _selectModel(self, prompt: str, context: str, options: AiCallOptions) -> str:
"""Select the best model based on operation type, tags, and requirements."""
totalSize = len(prompt.encode("utf-8")) + len(context.encode("utf-8"))
candidates: Dict[str, Dict[str, Any]] = {}
# Determine required tags from operation type
requiredTags = options.requiredTags
if not requiredTags:
requiredTags = OPERATION_TAG_MAPPING.get(options.operationType, [ModelTags.TEXT, ModelTags.CHAT])
# Override priority based on processing mode if not explicitly set
effectivePriority = options.priority
if options.priority == Priority.BALANCED:
effectivePriority = PROCESSING_MODE_PRIORITY_MAPPING.get(options.processingMode, Priority.BALANCED)
logger.info(f"Model selection - Operation: {options.operationType}, Required tags: {requiredTags}, Priority: {effectivePriority}")
for name, info in aiModels.items():
# Check context length
if info["contextLength"] > 0 and totalSize > info["contextLength"] * 0.8:
continue
# Check cost constraints
if options.maxCost is not None:
if self._estimateCost(info, totalSize) > options.maxCost:
continue
# Check required tags/capabilities
modelTags = info.get("tags", [])
if requiredTags and not any(tag in modelTags for tag in requiredTags):
continue
# Check processing mode requirements
if options.processingMode == ProcessingMode.DETAILED and ModelTags.FAST in modelTags:
# Skip fast models for detailed processing
continue
candidates[name] = info
if not candidates:
# Fallback based on operation type
if options.operationType == OperationType.IMAGE_ANALYSIS:
return "openai_callAiImage"
elif options.operationType == OperationType.IMAGE_GENERATION:
return "openai_generateImage"
elif options.operationType == OperationType.WEB_RESEARCH:
return "langdoc_callAiBasic"
else:
return "openai_callAiBasic_gpt35"
# Select based on priority
if effectivePriority == Priority.SPEED:
return max(candidates, key=lambda k: candidates[k]["speedRating"])
elif effectivePriority == Priority.QUALITY:
return max(candidates, key=lambda k: candidates[k]["qualityRating"])
elif effectivePriority == Priority.COST:
return min(candidates, key=lambda k: candidates[k]["costPer1kTokens"])
else: # BALANCED
def balancedScore(name: str) -> float:
info = candidates[name]
return info["qualityRating"] * 0.4 + info["speedRating"] * 0.3 + (10 - info["costPer1kTokens"] * 1000) * 0.3
return max(candidates, key=balancedScore)
def _connectorFor(self, modelName: str):
"""Get the appropriate connector for the model."""
connectorType = aiModels[modelName]["connector"]
if connectorType == "openai":
return self.openaiService
elif connectorType == "anthropic":
return self.anthropicService
elif connectorType == "langdoc":
return self.langdocService
elif connectorType == "tavily":
return self.tavilyService
else:
raise ValueError(f"Unknown connector type: {connectorType}")
async def call(self, request: AiCallRequest) -> AiCallResponse:
"""Call AI model for text generation."""
prompt = request.prompt
context = request.context or ""
options = request.options
# Compress optionally (prompt/context) - simple truncation fallback kept here
def maybeTruncate(text: str, limit: int) -> str:
data = text.encode("utf-8")
if len(data) <= limit:
return text
return data[:limit].decode("utf-8", errors="ignore") + "... [truncated]"
if options.compressPrompt and len(prompt.encode("utf-8")) > 2000:
prompt = maybeTruncate(prompt, 2000)
if options.compressContext and len(context.encode("utf-8")) > 70000:
context = maybeTruncate(context, 70000)
# Select model for text generation
modelName = self._selectModel(prompt, context, options)
messages: List[Dict[str, Any]] = []
if context:
messages.append({"role": "system", "content": f"Context from documents:\n{context}"})
messages.append({"role": "user", "content": prompt})
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
# Call the appropriate function
if functionName == "callAiBasic":
if aiModels[modelName]["connector"] == "openai":
content = await connector.callAiBasic(messages)
else:
response = await connector.callAiBasic(messages)
content = response["choices"][0]["message"]["content"]
else:
raise ValueError(f"Function {functionName} not supported for text generation")
# Estimate cost/tokens
totalSize = len((prompt + context).encode("utf-8"))
cost = self._estimateCost(aiModels[modelName], totalSize)
usedTokens = int(totalSize / 4)
return AiCallResponse(content=content, modelName=modelName, usedTokens=usedTokens, costEstimate=cost)
async def callImage(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None, options: AiCallOptions = None) -> str:
"""Call AI model for image analysis."""
if options is None:
options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS)
# Select model for image analysis
modelName = self._selectModel(prompt, "", options)
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
if functionName == "callAiImage":
return await connector.callAiImage(prompt, imageData, mimeType)
else:
raise ValueError(f"Function {functionName} not supported for image analysis")
async def generateImage(self, prompt: str, size: str = "1024x1024", quality: str = "standard", style: str = "vivid", options: AiCallOptions = None) -> Dict[str, Any]:
"""Generate an image using AI."""
if options is None:
options = AiCallOptions(operationType=OperationType.IMAGE_GENERATION)
# Select model for image generation
modelName = self._selectModel(prompt, "", options)
connector = self._connectorFor(modelName)
functionName = aiModels[modelName]["function"]
if functionName == "generateImage":
return await connector.generateImage(prompt, size, quality, style)
elif functionName == "generateImageWithVariations":
results = await connector.generateImageWithVariations(prompt, 1, size, quality, style)
return results[0] if results else {}
elif functionName == "generateImageWithChat":
content = await connector.generateImageWithChat(prompt, size, quality, style)
return {"content": content, "success": True}
else:
raise ValueError(f"Function {functionName} not supported for image generation")
# Web functionality methods
async def webSearch(self, web_search_request: WebSearchRequest) -> WebSearchActionResult:
"""Perform web search using Tavily."""
return await self.tavilyService.search(web_search_request)
async def webCrawl(self, web_crawl_request: WebCrawlRequest) -> WebCrawlActionResult:
"""Crawl web pages using Tavily."""
return await self.tavilyService.crawl(web_crawl_request)
async def webScrape(self, web_scrape_request: WebScrapeRequest) -> WebScrapeActionResult:
"""Scrape web content using Tavily."""
return await self.tavilyService.scrape(web_scrape_request)
async def webQuery(self, query: str, context: str = "", options: AiCallOptions = None) -> str:
"""Use LangDoc AI to provide the best answers for web-related queries."""
if options is None:
options = AiCallOptions(operationType=OperationType.WEB_RESEARCH)
# Create a comprehensive prompt for web queries
webPrompt = f"""You are an expert web researcher and information analyst. Please provide a comprehensive and accurate answer to the following web-related query.
Query: {query}
{f"Additional Context: {context}" if context else ""}
Please provide:
1. A clear, well-structured answer to the query
2. Key points and important details
3. Relevant insights and analysis
4. Any important considerations or caveats
5. Suggestions for further research if applicable
Format your response in a clear, professional manner that would be helpful for someone researching this topic."""
messages = [{"role": "user", "content": webPrompt}]
try:
# Use LangDoc for the best answers
response = await self.langdocService.callAiBasic(messages)
return response
except Exception as e:
logger.error(f"LangDoc web query failed: {str(e)}")
raise Exception(f"Failed to process web query: {str(e)}")
# Utility methods
async def listAvailableModels(self, connectorType: str = None) -> List[Dict[str, Any]]:
"""List available models, optionally filtered by connector type."""
if connectorType:
return [info for name, info in aiModels.items() if info["connector"] == connectorType]
return list(aiModels.values())
async def getModelInfo(self, modelName: str) -> Dict[str, Any]:
"""Get information about a specific model."""
if modelName not in aiModels:
raise ValueError(f"Model {modelName} not found")
return aiModels[modelName]
async def getModelsByCapability(self, capability: str) -> List[str]:
"""Get model names that support a specific capability."""
return [name for name, info in aiModels.items() if capability in info.get("capabilities", [])]
async def getModelsByTag(self, tag: str) -> List[str]:
"""Get model names that have a specific tag."""
return [name for name, info in aiModels.items() if tag in info.get("tags", [])]