gateway/modules/datamodels/datamodelAi.py
2025-12-15 21:55:26 +01:00

260 lines
13 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
from typing import Optional, List, Dict, Any, Callable, TYPE_CHECKING, Tuple
from pydantic import BaseModel, Field, ConfigDict
from enum import Enum
# Import ContentPart for runtime use (needed for Pydantic model rebuilding)
from modules.datamodels.datamodelExtraction import ContentPart
# Import JSON utilities for safe conversion
from modules.shared.jsonUtils import extractJsonString, tryParseJson, repairBrokenJson
# Operation Types
class OperationTypeEnum(str, Enum):
# Planning Operation
PLAN = "plan"
# Data Operations
DATA_ANALYSE = "dataAnalyse"
DATA_GENERATE = "dataGenerate"
DATA_EXTRACT = "dataExtract"
# Image Operations
IMAGE_ANALYSE = "imageAnalyse"
IMAGE_GENERATE = "imageGenerate"
# Web Operations
WEB_SEARCH = "webSearch" # Returns list of URLs only
WEB_CRAWL = "webCrawl" # Web crawl for a given URL
# Operation Type Rating - Helper class for capability ratings
class OperationTypeRating(BaseModel):
"""Represents an operation type with its capability rating (1-10)."""
operationType: OperationTypeEnum = Field(description="The operation type")
rating: int = Field(ge=1, le=10, description="Capability rating (1-10, higher = better for this operation type)")
def __str__(self) -> str:
return f"{self.operationType.value}({self.rating})"
def __repr__(self) -> str:
return f"OperationTypeRating({self.operationType.value}, {self.rating})"
# Helper function to create operation type ratings easily
def createOperationTypeRatings(*ratings: Tuple[OperationTypeEnum, int]) -> List[OperationTypeRating]:
"""
Helper function to create operation type ratings easily.
Usage:
operationTypes = createOperationTypeRatings(
(OperationTypeEnum.DATA_ANALYSE, 8),
(OperationTypeEnum.WEB_SEARCH, 10),
(OperationTypeEnum.WEB_CRAWL, 9)
)
"""
return [OperationTypeRating(operationType=ot, rating=rating) for ot, rating in ratings]
# Processing Modes
class ProcessingModeEnum(str, Enum):
BASIC = "basic"
ADVANCED = "advanced"
DETAILED = "detailed"
# Priority Levels
class PriorityEnum(str, Enum):
SPEED = "speed"
QUALITY = "quality"
COST = "cost"
BALANCED = "balanced"
# Model Capabilities - REMOVED: Not used in business logic
class AiModel(BaseModel):
"""Enhanced AI model definition with dynamic capabilities."""
# Core identification
name: str = Field(description="Actual LLM model name used for API calls")
displayName: str = Field(description="Human-readable model name with module prefix")
connectorType: str = Field(description="Type of connector (openai, anthropic, perplexity, tavily, etc.)")
# API configuration
apiUrl: str = Field(description="API endpoint URL for this model")
temperature: float = Field(default=0.2, ge=0.0, le=2.0, description="Default temperature for this model")
# Token and context limits
maxTokens: int = Field(description="Maximum tokens this model can generate")
contextLength: int = Field(description="Maximum context length this model can handle")
# Cost information
costPer1kTokensInput: float = Field(default=0.0, description="Cost per 1000 input tokens")
costPer1kTokensOutput: float = Field(default=0.0, description="Cost per 1000 output tokens")
# Performance ratings
speedRating: int = Field(ge=1, le=10, description="Speed rating (1-10, higher = faster)")
qualityRating: int = Field(ge=1, le=10, description="Quality rating (1-10, higher = better)")
# Function reference (not serialized)
functionCall: Optional[Callable] = Field(default=None, exclude=True, description="Function to call for this model")
calculatePriceUsd: Optional[Callable] = Field(default=None, exclude=True, description="Function to calculate price in USD")
# Selection criteria - capabilities with ratings
priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Default priority for this model. See PriorityEnum for available values.")
processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Default processing mode. See ProcessingModeEnum for available values.")
operationTypes: List[OperationTypeRating] = Field(default=[], description="Operation types this model can handle with capability ratings (1-10)")
minContextLength: Optional[int] = Field(default=None, description="Minimum context length required")
isAvailable: bool = Field(default=True, description="Whether model is currently available")
# Metadata
version: Optional[str] = Field(default=None, description="Model version")
lastUpdated: Optional[str] = Field(default=None, description="Last update timestamp")
model_config = ConfigDict(arbitrary_types_allowed=True) # Allow Callable type
class SelectionRule(BaseModel):
"""A rule for model selection."""
name: str = Field(description="Rule name identifier")
condition: str = Field(description="Description of when this rule applies")
weight: float = Field(description="Weight for scoring (higher = more important)")
operationTypes: List[OperationTypeEnum] = Field(description="Operation types this rule applies to")
priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level for this rule")
minQualityRating: Optional[int] = Field(default=None, description="Minimum quality rating")
maxCost: Optional[float] = Field(default=None, description="Maximum cost threshold")
minContextLength: Optional[int] = Field(default=None, description="Minimum context length required")
class AiCallOptions(BaseModel):
"""Options for centralized AI processing with clear operation types and tags."""
operationType: OperationTypeEnum = Field(default=OperationTypeEnum.DATA_ANALYSE, description="Type of operation")
priority: PriorityEnum = Field(default=PriorityEnum.BALANCED, description="Priority level")
compressPrompt: bool = Field(default=True, description="Whether to compress the prompt")
compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary")
processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately; else pool docs")
maxCost: Optional[float] = Field(default=None, description="Max cost budget")
maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds")
processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Processing mode")
resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.")
safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits (0.0-0.5)")
# Model generation parameters
temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0, description="Temperature for response generation (0.0-2.0, lower = more consistent)")
maxParts: Optional[int] = Field(default=1000, ge=1, le=1000, description="Maximum number of continuation parts to fetch")
class AiCallRequest(BaseModel):
"""Centralized AI call request payload for interface use."""
prompt: str = Field(description="The user prompt")
context: Optional[str] = Field(default=None, description="Optional external context (e.g., extracted docs)")
options: AiCallOptions = Field(default_factory=AiCallOptions)
contentParts: Optional[List['ContentPart']] = None # NEW: Content parts for model-aware chunking
class AiCallResponse(BaseModel):
"""Standardized AI call response."""
content: str = Field(description="AI response content")
modelName: str = Field(description="Selected model name")
priceUsd: float = Field(default=0.0, description="Calculated price in USD")
processingTime: float = Field(default=0.0, description="Duration in seconds")
bytesSent: int = Field(default=0, description="Input data size in bytes")
bytesReceived: int = Field(default=0, description="Output data size in bytes")
errorCount: int = Field(default=0, description="0 for success, 1+ for errors")
class AiModelCall(BaseModel):
"""Standardized input for AI model calls."""
messages: List[Dict[str, Any]] = Field(description="Messages in OpenAI format (role, content)")
model: Optional[AiModel] = Field(default=None, description="The AI model being called")
options: AiCallOptions = Field(default_factory=AiCallOptions, description="Additional model-specific options")
model_config = ConfigDict(arbitrary_types_allowed=True)
class AiModelResponse(BaseModel):
"""Standardized output from AI model calls."""
content: str = Field(description="The AI response content")
success: bool = Field(default=True, description="Whether the call was successful")
error: Optional[str] = Field(default=None, description="Error message if success=False")
# Optional metadata that models can include
modelId: Optional[str] = Field(default=None, description="Model identifier used")
processingTime: Optional[float] = Field(default=None, description="Processing time in seconds")
tokensUsed: Optional[Dict[str, int]] = Field(default=None, description="Token usage (input, output, total)")
metadata: Optional[Dict[str, Any]] = Field(default=None, description="Additional model-specific metadata")
model_config = ConfigDict(arbitrary_types_allowed=True)
# Structured prompt models for specialized operations
class AiCallPromptWebSearch(BaseModel):
"""Structured prompt format for WEB_SEARCH operation - returns list of URLs."""
instruction: str = Field(description="Search instruction/query for finding relevant URLs")
country: Optional[str] = Field(default=None, description="Two-digit country code (lowercase, e.g., ch, us, de, fr)")
maxNumberPages: Optional[int] = Field(default=10, description="Maximum number of pages to search (default: 10)")
language: Optional[str] = Field(default=None, description="Language code (lowercase, e.g., de, en, fr)")
researchDepth: Optional[str] = Field(default="general", description="Research depth: fast (maxDepth=1), general (maxDepth=2), deep (maxDepth=3)")
class AiCallPromptWebCrawl(BaseModel):
"""Structured prompt format for WEB_CRAWL operation - crawls ONE specific URL and returns content."""
instruction: str = Field(description="Instruction for what content to extract from URL")
url: str = Field(description="Single URL to crawl")
maxDepth: Optional[int] = Field(default=2, description="Maximum number of hops from starting page (default: 2)")
maxWidth: Optional[int] = Field(default=10, description="Maximum pages to crawl per level (default: 10)")
class AiCallPromptImage(BaseModel):
"""Structured prompt format for image generation."""
prompt: str = Field(description="Text description of the image to generate")
size: Optional[str] = Field(default="1024x1024", description="Image size (1024x1024, 1792x1024, 1024x1792)")
quality: Optional[str] = Field(default="standard", description="Image quality (standard, hd)")
style: Optional[str] = Field(default="vivid", description="Image style (vivid, natural)")
class AiProcessParameters(BaseModel):
"""Parameters for AI processing action."""
aiPrompt: str = Field(description="AI instruction prompt")
contentParts: Optional[List[ContentPart]] = Field(
None,
description="Already-extracted content parts (required if documents need to be processed)"
)
resultType: str = Field(
default="txt",
description="Output file extension (txt, json, pdf, docx, xlsx, etc.)"
)
# NOTE: DocumentData, AiResponseMetadata, and AiResponse are defined in datamodelWorkflow.py
# Import them from there if needed: from modules.datamodels.datamodelWorkflow import DocumentData, AiResponseMetadata, AiResponse
class JsonAccumulationState(BaseModel):
"""State for JSON string accumulation during iterative AI generation."""
accumulatedJsonString: str = Field(description="Raw accumulated JSON string")
isAccumulationMode: bool = Field(description="True if we're accumulating fragments")
lastParsedResult: Optional[Dict[str, Any]] = Field(
default=None,
description="Last successfully parsed result (for prompt context)"
)
allSections: List[Dict[str, Any]] = Field(
default_factory=list,
description="Sections extracted so far (for prompt context)"
)
kpis: List[Dict[str, Any]] = Field(
default_factory=list,
description="KPI definitions with current values: [{id, description, jsonPath, targetValue, currentValue}, ...]"
)