from typing import Optional, List, Dict, Any, Callable, TYPE_CHECKING, Tuple from pydantic import BaseModel, Field, ConfigDict from enum import Enum # Import ContentPart for runtime use (needed for Pydantic model rebuilding) from modules.datamodels.datamodelExtraction import ContentPart # Import JSON utilities for safe conversion from modules.shared.jsonUtils import extractJsonString, tryParseJson, repairBrokenJson # Operation Types class OperationTypeEnum(str, Enum): # Planning Operation PLAN = "plan" # Data Operations DATA_ANALYSE = "dataAnalyse" DATA_GENERATE = "dataGenerate" DATA_EXTRACT = "dataExtract" # Image Operations IMAGE_ANALYSE = "imageAnalyse" IMAGE_GENERATE = "imageGenerate" # Web Operations WEB_SEARCH = "webSearch" # Returns list of URLs only WEB_CRAWL = "webCrawl" # Web crawl for a given URL # Operation Type Rating - Helper class for capability ratings class OperationTypeRating(BaseModel): """Represents an operation type with its capability rating (1-10).""" operationType: OperationTypeEnum = Field(description="The operation type") rating: int = Field(ge=1, le=10, description="Capability rating (1-10, higher = better for this operation type)") def __str__(self) -> str: return f"{self.operationType.value}({self.rating})" def __repr__(self) -> str: return f"OperationTypeRating({self.operationType.value}, {self.rating})" # Helper function to create operation type ratings easily def createOperationTypeRatings(*ratings: Tuple[OperationTypeEnum, int]) -> List[OperationTypeRating]: """ Helper function to create operation type ratings easily. Usage: operationTypes = createOperationTypeRatings( (OperationTypeEnum.DATA_ANALYSE, 8), (OperationTypeEnum.WEB_SEARCH, 10), (OperationTypeEnum.WEB_CRAWL, 9) ) """ return [OperationTypeRating(operationType=ot, rating=rating) for ot, rating in ratings] # Processing Modes class ProcessingModeEnum(str, Enum): BASIC = "basic" ADVANCED = "advanced" DETAILED = "detailed" # Priority Levels class PriorityEnum(str, Enum): SPEED = "speed" QUALITY = "quality" COST = "cost" BALANCED = "balanced" # Model Capabilities - REMOVED: Not used in business logic class AiModel(BaseModel): """Enhanced AI model definition with dynamic capabilities.""" # Core identification name: str = Field(description="Actual LLM model name used for API calls") displayName: str = Field(description="Human-readable model name with module prefix") connectorType: str = Field(description="Type of connector (openai, anthropic, perplexity, tavily, etc.)") # API configuration apiUrl: str = Field(description="API endpoint URL for this model") temperature: float = Field(default=0.2, ge=0.0, le=2.0, description="Default temperature for this model") # Token and context limits maxTokens: int = Field(description="Maximum tokens this model can generate") contextLength: int = Field(description="Maximum context length this model can handle") # Cost information costPer1kTokensInput: float = Field(default=0.0, description="Cost per 1000 input tokens") costPer1kTokensOutput: float = Field(default=0.0, description="Cost per 1000 output tokens") # Performance ratings speedRating: int = Field(ge=1, le=10, description="Speed rating (1-10, higher = faster)") qualityRating: int = Field(ge=1, le=10, description="Quality rating (1-10, higher = better)") # Function reference (not serialized) functionCall: Optional[Callable] = Field(default=None, exclude=True, description="Function to call for this model") calculatePriceUsd: Optional[Callable] = Field(default=None, exclude=True, description="Function to calculate price in USD") # Selection criteria - capabilities with ratings priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Default priority for this model. See PriorityEnum for available values.") processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Default processing mode. See ProcessingModeEnum for available values.") operationTypes: List[OperationTypeRating] = Field(default=[], description="Operation types this model can handle with capability ratings (1-10)") minContextLength: Optional[int] = Field(default=None, description="Minimum context length required") isAvailable: bool = Field(default=True, description="Whether model is currently available") # Metadata version: Optional[str] = Field(default=None, description="Model version") lastUpdated: Optional[str] = Field(default=None, description="Last update timestamp") model_config = ConfigDict(arbitrary_types_allowed=True) # Allow Callable type class SelectionRule(BaseModel): """A rule for model selection.""" name: str = Field(description="Rule name identifier") condition: str = Field(description="Description of when this rule applies") weight: float = Field(description="Weight for scoring (higher = more important)") operationTypes: List[OperationTypeEnum] = Field(description="Operation types this rule applies to") priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level for this rule") minQualityRating: Optional[int] = Field(default=None, description="Minimum quality rating") maxCost: Optional[float] = Field(default=None, description="Maximum cost threshold") minContextLength: Optional[int] = Field(default=None, description="Minimum context length required") class AiCallOptions(BaseModel): """Options for centralized AI processing with clear operation types and tags.""" operationType: OperationTypeEnum = Field(default=OperationTypeEnum.DATA_ANALYSE, description="Type of operation") priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level") compressPrompt: bool = Field(default=True, description="Whether to compress the prompt") compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary") processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately; else pool docs") maxCost: Optional[float] = Field(default=None, description="Max cost budget") maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds") processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Processing mode") resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.") safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits (0.0-0.5)") # Model generation parameters temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0, description="Temperature for response generation (0.0-2.0, lower = more consistent)") maxParts: Optional[int] = Field(default=1000, ge=1, le=1000, description="Maximum number of continuation parts to fetch") class AiCallRequest(BaseModel): """Centralized AI call request payload for interface use.""" prompt: str = Field(description="The user prompt") context: Optional[str] = Field(default=None, description="Optional external context (e.g., extracted docs)") options: AiCallOptions = Field(default_factory=AiCallOptions) contentParts: Optional[List['ContentPart']] = None # NEW: Content parts for model-aware chunking class AiCallResponse(BaseModel): """Standardized AI call response.""" content: str = Field(description="AI response content") modelName: str = Field(description="Selected model name") priceUsd: float = Field(default=0.0, description="Calculated price in USD") processingTime: float = Field(default=0.0, description="Duration in seconds") bytesSent: int = Field(default=0, description="Input data size in bytes") bytesReceived: int = Field(default=0, description="Output data size in bytes") errorCount: int = Field(default=0, description="0 for success, 1+ for errors") class AiModelCall(BaseModel): """Standardized input for AI model calls.""" messages: List[Dict[str, Any]] = Field(description="Messages in OpenAI format (role, content)") model: Optional[AiModel] = Field(default=None, description="The AI model being called") options: AiCallOptions = Field(default_factory=AiCallOptions, description="Additional model-specific options") model_config = ConfigDict(arbitrary_types_allowed=True) class AiModelResponse(BaseModel): """Standardized output from AI model calls.""" content: str = Field(description="The AI response content") success: bool = Field(default=True, description="Whether the call was successful") error: Optional[str] = Field(default=None, description="Error message if success=False") # Optional metadata that models can include modelId: Optional[str] = Field(default=None, description="Model identifier used") processingTime: Optional[float] = Field(default=None, description="Processing time in seconds") tokensUsed: Optional[Dict[str, int]] = Field(default=None, description="Token usage (input, output, total)") metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional model-specific metadata") model_config = ConfigDict(arbitrary_types_allowed=True) # Structured prompt models for specialized operations class AiCallPromptWebSearch(BaseModel): """Structured prompt format for WEB_SEARCH operation - returns list of URLs.""" instruction: str = Field(description="Search instruction/query for finding relevant URLs") country: Optional[str] = Field(default=None, description="Two-digit country code (lowercase, e.g., ch, us, de, fr)") maxNumberPages: Optional[int] = Field(default=10, description="Maximum number of pages to search (default: 10)") language: Optional[str] = Field(default=None, description="Language code (lowercase, e.g., de, en, fr)") researchDepth: Optional[str] = Field(default="general", description="Research depth: fast (maxDepth=1), general (maxDepth=2), deep (maxDepth=3)") class AiCallPromptWebCrawl(BaseModel): """Structured prompt format for WEB_CRAWL operation - crawls ONE specific URL and returns content.""" instruction: str = Field(description="Instruction for what content to extract from URL") url: str = Field(description="Single URL to crawl") maxDepth: Optional[int] = Field(default=2, description="Maximum number of hops from starting page (default: 2)") maxWidth: Optional[int] = Field(default=10, description="Maximum pages to crawl per level (default: 10)") class AiCallPromptImage(BaseModel): """Structured prompt format for image generation.""" prompt: str = Field(description="Text description of the image to generate") size: Optional[str] = Field(default="1024x1024", description="Image size (1024x1024, 1792x1024, 1024x1792)") quality: Optional[str] = Field(default="standard", description="Image quality (standard, hd)") style: Optional[str] = Field(default="vivid", description="Image style (vivid, natural)") class DocumentData(BaseModel): """Single document in response.""" documentName: str = Field(description="Document name") documentData: Any = Field(description="Document data (can be str, bytes, dict, etc.)") mimeType: str = Field(description="MIME type of the document") class AiProcessParameters(BaseModel): """Parameters for AI processing action.""" aiPrompt: str = Field(description="AI instruction prompt") contentParts: Optional[List[ContentPart]] = Field( None, description="Already-extracted content parts (required if documents need to be processed)" ) resultType: str = Field( default="txt", description="Output file extension (txt, json, pdf, docx, xlsx, etc.)" ) class AiResponseMetadata(BaseModel): """Metadata for AI response (varies by operation type).""" # Document Generation Metadata title: Optional[str] = Field(None, description="Document title") filename: Optional[str] = Field(None, description="Document filename") # Operation-Specific Metadata operationType: Optional[str] = Field(None, description="Type of operation performed") schemaVersion: Optional[str] = Field(None, description="Schema version (e.g., 'parameters_v1')", alias="schema") extractionMethod: Optional[str] = Field(None, description="Method used for extraction") sourceDocuments: Optional[List[str]] = Field(None, description="Source document references") # Additional metadata (for extensibility) additionalData: Optional[Dict[str, Any]] = Field(None, description="Additional operation-specific metadata") @classmethod def fromDict(cls, data: Optional[Dict[str, Any]]) -> Optional["AiResponseMetadata"]: """Create AiResponseMetadata from dict with camelCase field names.""" if not data or not isinstance(data, dict): return None knownFields = {"title", "filename", "operationType", "schema", "extractionMethod", "sourceDocuments", "additionalData"} mappedData = {k: v for k, v in data.items() if k in knownFields} additionalFields = {k: v for k, v in data.items() if k not in knownFields} if additionalFields: mappedData["additionalData"] = additionalFields try: return cls(**mappedData) except Exception: return None class AiResponse(BaseModel): """Unified response from all AI calls (planning, text, documents).""" content: str = Field(description="Response content (JSON string for planning, text for analysis, unified JSON for documents)") metadata: Optional[AiResponseMetadata] = Field( None, description="Response metadata (varies by operation type)" ) documents: Optional[List[DocumentData]] = Field( None, description="Generated documents (only for document generation operations)" ) def toJson(self) -> Dict[str, Any]: """ Convert AI response content to JSON using enhanced stabilizing failsafe conversion methods. Centralizes AI result to JSON conversion in one place. Returns: Dict containing the parsed JSON content, or a safe fallback structure if parsing fails. """ if not self.content: return {} # Use enhanced stabilizing failsafe JSON conversion methods from jsonUtils # First, try to extract and parse JSON using the safe methods obj, err, cleaned = tryParseJson(self.content) if err is None and isinstance(obj, dict): # Successfully parsed as dict return obj elif err is None and isinstance(obj, list): # Successfully parsed as list - wrap in dict for consistency return {"data": obj} # If parsing failed, try to repair broken JSON repaired = repairBrokenJson(self.content) if repaired is not None: return repaired # If all else fails, return a safe structure with the cleaned content # Extract JSON string even if it's not fully parseable extracted = extractJsonString(self.content) if extracted and extracted != self.content: # Try one more time with extracted string obj, err, _ = tryParseJson(extracted) if err is None and isinstance(obj, (dict, list)): return obj if isinstance(obj, dict) else {"data": obj} # Final fallback: return safe structure with raw content return { "content": self.content, "parseError": True }