From 8523da7fe2b9e89f7232a8186aa1ef00e185c712 Mon Sep 17 00:00:00 2001 From: ValueOn AG Date: Fri, 24 Oct 2025 22:46:05 +0200 Subject: [PATCH] cleanup pydantic v2, unnecessary pdantic to dict convesrions, unnecessary unions removed with clean classes --- modules/aicore/aicorePluginAnthropic.py | 4 +- modules/connectors/connectorDbPostgre.py | 19 +- modules/datamodels/datamodelAi.py | 28 --- modules/datamodels/datamodelExtraction.py | 150 +++++--------- modules/interfaces/interfaceAiObjects.py | 11 +- modules/interfaces/interfaceDbChatObjects.py | 8 +- modules/routes/routeDataPrompts.py | 5 +- modules/routes/routeDataUsers.py | 10 +- .../serviceAi/subDocumentProcessing.py | 188 ++++++++---------- .../mainServiceExtraction.py | 5 +- .../merging/mergerDefault.py | 4 +- .../serviceExtraction/merging/mergerTable.py | 10 +- .../serviceExtraction/merging/mergerText.py | 10 +- .../services/serviceExtraction/subPipeline.py | 29 ++- test_ai_model_selection.py | 26 +-- 15 files changed, 188 insertions(+), 319 deletions(-) diff --git a/modules/aicore/aicorePluginAnthropic.py b/modules/aicore/aicorePluginAnthropic.py index 5f5e2bcb..83482d6f 100644 --- a/modules/aicore/aicorePluginAnthropic.py +++ b/modules/aicore/aicorePluginAnthropic.py @@ -50,7 +50,7 @@ class AiAnthropic(BaseConnectorAi): connectorType="anthropic", apiUrl="https://api.anthropic.com/v1/messages", temperature=0.2, - maxTokens=200000, + maxTokens=8192, contextLength=200000, costPer1kTokensInput=0.015, costPer1kTokensOutput=0.075, @@ -75,7 +75,7 @@ class AiAnthropic(BaseConnectorAi): connectorType="anthropic", apiUrl="https://api.anthropic.com/v1/messages", temperature=0.2, - maxTokens=200000, + maxTokens=8192, contextLength=200000, costPer1kTokensInput=0.015, costPer1kTokensOutput=0.075, diff --git a/modules/connectors/connectorDbPostgre.py b/modules/connectors/connectorDbPostgre.py index e01e267b..77441399 100644 --- a/modules/connectors/connectorDbPostgre.py +++ b/modules/connectors/connectorDbPostgre.py @@ -720,22 +720,9 @@ class DatabaseConnector: logger.info(f"Initial ID {initialId} for table {table} registered") return success else: - # Check if the existing initial ID still exists in the table - existingInitialId = systemData[table] - records = self.getRecordset( - model_class, recordFilter={"id": existingInitialId} - ) - if not records: - # The initial record no longer exists, update to the new one - systemData[table] = initialId - success = self._saveSystemTable(systemData) - if success: - logger.info( - f"Initial ID updated from {existingInitialId} to {initialId} for table {table}" - ) - return success - else: - return True + # Table already has an initial ID registered + logger.debug(f"Table {table} already has initial ID {systemData[table]}") + return True except Exception as e: logger.error(f"Error registering the initial ID for table {table}: {e}") return False diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 730c73cc..7643dc79 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -135,7 +135,6 @@ class AiCallOptions(BaseModel): compressPrompt: bool = Field(default=True, description="Whether to compress the prompt") compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary") processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately; else pool docs") - maxContextBytes: Optional[int] = Field(default=None, description="Hard cap for extracted context size passed to the model") maxCost: Optional[float] = Field(default=None, description="Max cost budget") maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds") processingMode: ProcessingModeEnum = Field(default=ProcessingModeEnum.BASIC, description="Processing mode") @@ -145,7 +144,6 @@ class AiCallOptions(BaseModel): # Model generation parameters temperature: Optional[float] = Field(default=None, ge=0.0, le=2.0, description="Temperature for response generation (0.0-2.0, lower = more consistent)") - maxTokens: Optional[int] = Field(default=None, ge=1, le=32000, description="Maximum tokens in response") maxParts: Optional[int] = Field(default=1000, ge=1, le=1000, description="Maximum number of continuation parts to fetch") @@ -170,32 +168,6 @@ class AiCallResponse(BaseModel): errorCount: int = Field(default=0, description="0 for success, 1+ for errors") -class EnhancedAiCallOptions(AiCallOptions): - """Enhanced options for improved document processing with chunk mapping.""" - - # Parallel processing - enableParallelProcessing: bool = Field( - default=True, - description="Enable parallel processing of chunks" - ) - maxConcurrentChunks: int = Field( - default=5, - ge=1, - le=20, - description="Maximum number of chunks to process concurrently" - ) - - # Chunk mapping - preserveChunkMetadata: bool = Field( - default=True, - description="Preserve chunk metadata during processing" - ) - chunkSeparator: str = Field( - default="\n\n---\n\n", - description="Separator between chunks in merged output" - ) - - class AiModelCall(BaseModel): """Standardized input for AI model calls.""" diff --git a/modules/datamodels/datamodelExtraction.py b/modules/datamodels/datamodelExtraction.py index b0ba0f9b..242d413a 100644 --- a/modules/datamodels/datamodelExtraction.py +++ b/modules/datamodels/datamodelExtraction.py @@ -1,6 +1,9 @@ -from typing import Any, Dict, List, Optional, Literal +from typing import Any, Dict, List, Optional, Literal, TYPE_CHECKING from pydantic import BaseModel, Field +if TYPE_CHECKING: + from modules.datamodels.datamodelAi import OperationTypeEnum + class ContentPart(BaseModel): id: str = Field(description="Unique content part identifier") @@ -40,106 +43,49 @@ class PartResult(BaseModel): class MergeStrategy(BaseModel): """Strategy configuration for merging content parts and AI results.""" + groupBy: str = Field(default="typeGroup", description="Field to group parts by (typeGroup, parentId, label, etc.)") + orderBy: str = Field(default="id", description="Field to order parts within groups (id, order, pageIndex, etc.)") + mergeType: Literal["concatenate", "hierarchical", "intelligent"] = Field(default="concatenate", description="How to merge content within groups") + maxSize: Optional[int] = Field(default=None, description="Maximum size for merged content in bytes") + textMerge: Optional[Dict[str, Any]] = Field(default=None, description="Text-specific merge settings (separator, formatting, etc.)") + tableMerge: Optional[Dict[str, Any]] = Field(default=None, description="Table-specific merge settings (header handling, etc.)") + structureMerge: Optional[Dict[str, Any]] = Field(default=None, description="Structure-specific merge settings (hierarchy, etc.)") + aiResultMerge: Optional[Dict[str, Any]] = Field(default=None, description="AI result merging settings (prompt, context, etc.)") + preserveChunks: bool = Field(default=False, description="Whether to preserve individual chunks or merge them") + chunkSeparator: str = Field(default="\n\n---\n\n", description="Separator between chunks when merging") + preserveMetadata: bool = Field(default=True, description="Whether to preserve metadata from original parts") + metadataFields: Optional[List[str]] = Field(default=None, description="Specific metadata fields to preserve (None = all)") + onError: Literal["skip", "include", "fail"] = Field(default="skip", description="How to handle errors during merging") + validateContent: bool = Field(default=True, description="Whether to validate content before merging") + useIntelligentMerging: bool = Field(default=False, description="Whether to use intelligent token-aware merging") + prompt: Optional[str] = Field(default=None, description="Prompt for intelligent merging") + capabilities: Optional[Dict[str, Any]] = Field(default=None, description="Model capabilities for intelligent merging") - # Grouping configuration - groupBy: str = Field( - default="typeGroup", - description="Field to group parts by (typeGroup, parentId, label, etc.)" - ) - - # Ordering configuration - orderBy: str = Field( - default="id", - description="Field to order parts within groups (id, order, pageIndex, etc.)" - ) - - # Merge behavior - mergeType: Literal["concatenate", "hierarchical", "intelligent"] = Field( - default="concatenate", - description="How to merge content within groups" - ) - - # Size limits - maxSize: Optional[int] = Field( - default=None, - description="Maximum size for merged content in bytes" - ) - - # Type-specific merge settings - textMerge: Optional[Dict[str, Any]] = Field( - default=None, - description="Text-specific merge settings (separator, formatting, etc.)" - ) - - tableMerge: Optional[Dict[str, Any]] = Field( - default=None, - description="Table-specific merge settings (header handling, etc.)" - ) - - structureMerge: Optional[Dict[str, Any]] = Field( - default=None, - description="Structure-specific merge settings (hierarchy, etc.)" - ) - - # AI result merging - aiResultMerge: Optional[Dict[str, Any]] = Field( - default=None, - description="AI result merging settings (prompt, context, etc.)" - ) - - # Chunk handling - preserveChunks: bool = Field( - default=False, - description="Whether to preserve individual chunks or merge them" - ) - - chunkSeparator: str = Field( - default="\n\n---\n\n", - description="Separator between chunks when merging" - ) - - # Metadata handling - preserveMetadata: bool = Field( - default=True, - description="Whether to preserve metadata from original parts" - ) - - metadataFields: Optional[List[str]] = Field( - default=None, - description="Specific metadata fields to preserve (None = all)" - ) - - # Error handling - onError: Literal["skip", "include", "fail"] = Field( - default="skip", - description="How to handle errors during merging" - ) - - # Validation - validateContent: bool = Field( - default=True, - description="Whether to validate content before merging" - ) - - def getTypeSpecificSettings(self, typeGroup: str) -> Dict[str, Any]: - """Get type-specific merge settings for a content type.""" - if typeGroup == "text" and self.textMerge: - return self.textMerge - elif typeGroup == "table" and self.tableMerge: - return self.tableMerge - elif typeGroup == "structure" and self.structureMerge: - return self.structureMerge - else: - return {} - - def shouldPreserveChunk(self, chunk: Dict[str, Any]) -> bool: - """Determine if a chunk should be preserved based on strategy.""" - if not self.preserveChunks: - return False - - # Check if chunk has error metadata - if self.onError == "skip" and chunk.get("metadata", {}).get("error"): - return False - - return True +class ExtractionOptions(BaseModel): + """Options for document extraction and processing with clear data structures.""" + + # Core extraction parameters + prompt: str = Field(description="Extraction prompt for AI processing") + operationType: 'OperationTypeEnum' = Field(description="Type of operation for AI processing") + processDocumentsIndividually: bool = Field(default=True, description="Process each document separately") + + # Image processing parameters + imageMaxPixels: int = Field(default=1024 * 1024, ge=1, description="Maximum pixels for image processing") + imageQuality: int = Field(default=85, ge=1, le=100, description="Image quality (1-100)") + + # Merging strategy + mergeStrategy: MergeStrategy = Field(description="Strategy for merging extraction results") + + # Optional chunking parameters (for backward compatibility) + chunkAllowed: Optional[bool] = Field(default=None, description="Whether chunking is allowed") + maxSize: Optional[int] = Field(default=None, description="Maximum size for processing") + textChunkSize: Optional[int] = Field(default=None, description="Size for text chunks") + imageChunkSize: Optional[int] = Field(default=None, description="Size for image chunks") + + # Additional processing options + enableParallelProcessing: bool = Field(default=True, description="Enable parallel processing of chunks") + maxConcurrentChunks: int = Field(default=5, ge=1, le=20, description="Maximum number of chunks to process concurrently") + + class Config: + arbitraryTypesAllowed = True # Allow OperationTypeEnum import diff --git a/modules/interfaces/interfaceAiObjects.py b/modules/interfaces/interfaceAiObjects.py index a76e5e41..51fa90af 100644 --- a/modules/interfaces/interfaceAiObjects.py +++ b/modules/interfaces/interfaceAiObjects.py @@ -127,7 +127,7 @@ class AiObjects: logger.info(f"Attempting AI call with model: {model.name} (attempt {attempt + 1}/{len(failoverModelList)})") # Call the model - response = await self._callWithModel(model, prompt, context) + response = await self._callWithModel(model, prompt, context, options) logger.info(f"✅ AI call successful with model: {model.name}") return response @@ -204,7 +204,7 @@ class AiObjects: if partSize <= modelContextBytes: # Part fits - call AI directly - response = await self._callWithModel(model, prompt, contentPart.data) + response = await self._callWithModel(model, prompt, contentPart.data, options) logger.info(f"✅ Content part processed successfully with model: {model.name}") return response else: @@ -216,7 +216,7 @@ class AiObjects: # Process each chunk chunkResults = [] for chunk in chunks: - chunkResponse = await self._callWithModel(model, prompt, chunk['data']) + chunkResponse = await self._callWithModel(model, prompt, chunk['data'], options) chunkResults.append(chunkResponse) # Merge chunk results @@ -393,7 +393,7 @@ class AiObjects: errorCount=1 ) - async def _callWithModel(self, model: AiModel, prompt: str, context: str) -> AiCallResponse: + async def _callWithModel(self, model: AiModel, prompt: str, context: str, options: AiCallOptions = None) -> AiCallResponse: """Call a specific model and return the response.""" # Calculate input bytes from prompt and context inputBytes = len((prompt + context).encode('utf-8')) @@ -430,7 +430,8 @@ class AiObjects: # Create standardized call object modelCall = AiModelCall( messages=messages, - model=model + model=model, + options=options or {} ) # Call the model with standardized interface diff --git a/modules/interfaces/interfaceDbChatObjects.py b/modules/interfaces/interfaceDbChatObjects.py index 2c952058..31db88d8 100644 --- a/modules/interfaces/interfaceDbChatObjects.py +++ b/modules/interfaces/interfaceDbChatObjects.py @@ -873,7 +873,7 @@ class ChatObjects: stat = ChatStat(**statData) # Create the stat record in the database - created = self.db.recordCreate(ChatStat, stat.model_dump()) + created = self.db.recordCreate(ChatStat, stat) # Return the created ChatStat return ChatStat(**created) @@ -937,7 +937,7 @@ class ChatObjects: items.append({ "type": "message", "createdAt": msg_timestamp, - "item": chat_message.model_dump() + "item": chat_message }) # Get logs @@ -952,7 +952,7 @@ class ChatObjects: items.append({ "type": "log", "createdAt": log_timestamp, - "item": chat_log.model_dump() + "item": chat_log }) # Get stats list @@ -966,7 +966,7 @@ class ChatObjects: items.append({ "type": "stat", "createdAt": stat_timestamp, - "item": stat.model_dump() + "item": stat }) # Sort all items by createdAt timestamp for chronological order diff --git a/modules/routes/routeDataPrompts.py b/modules/routes/routeDataPrompts.py index 97da7846..51968808 100644 --- a/modules/routes/routeDataPrompts.py +++ b/modules/routes/routeDataPrompts.py @@ -48,11 +48,8 @@ async def create_prompt( """Create a new prompt""" managementInterface = interfaceDbComponentObjects.getInterface(currentUser) - # Convert Prompt to dict for interface - prompt_data = prompt.model_dump() - # Create prompt - newPrompt = managementInterface.createPrompt(prompt_data) + newPrompt = managementInterface.createPrompt(prompt) return Prompt(**newPrompt) diff --git a/modules/routes/routeDataUsers.py b/modules/routes/routeDataUsers.py index 578eeeb0..afd286d6 100644 --- a/modules/routes/routeDataUsers.py +++ b/modules/routes/routeDataUsers.py @@ -92,11 +92,8 @@ async def create_user( """Create a new user""" appInterface = interfaceDbAppObjects.getInterface(currentUser) - # Convert User to dict for interface - user_dict = user_data.model_dump() - # Create user - newUser = appInterface.createUser(user_dict) + newUser = appInterface.createUser(user_data) return newUser @@ -119,11 +116,8 @@ async def update_user( detail=f"User with ID {userId} not found" ) - # Convert User to dict for interface - update_data = userData.model_dump() - # Update user - updatedUser = appInterface.updateUser(userId, update_data) + updatedUser = appInterface.updateUser(userId, userData) if not updatedUser: raise HTTPException( diff --git a/modules/services/serviceAi/subDocumentProcessing.py b/modules/services/serviceAi/subDocumentProcessing.py index 726ff62d..8cdca91c 100644 --- a/modules/services/serviceAi/subDocumentProcessing.py +++ b/modules/services/serviceAi/subDocumentProcessing.py @@ -5,7 +5,7 @@ import time from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum -from modules.datamodels.datamodelExtraction import ChunkResult, ContentExtracted, PartResult +from modules.datamodels.datamodelExtraction import ChunkResult, ContentExtracted, PartResult, ExtractionOptions, MergeStrategy from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService logger = logging.getLogger(__name__) @@ -33,19 +33,6 @@ class SubDocumentProcessing: self._extractionService = ExtractionService(self.services) return self._extractionService - def _calculateMaxContextBytes(self, options: Optional[AiCallOptions]) -> int: - """Calculate maximum context bytes based on model capabilities and options.""" - if options and options.maxContextBytes: - return options.maxContextBytes - - # Default model capabilities (this should be enhanced with actual model registry) - defaultMaxTokens = 4000 - safetyMargin = options.safetyMargin if options else 0.1 - - # Calculate bytes (4 chars per token estimation) - maxContextBytes = int(defaultMaxTokens * (1 - safetyMargin) * 4) - - return maxContextBytes async def processDocumentsPerChunk( self, @@ -68,22 +55,23 @@ class SubDocumentProcessing: if not documents: return "" - # Build extraction options WITHOUT chunking parameters - extractionOptions: Dict[str, Any] = { - "prompt": prompt, - "operationType": options.operationType if options else OperationTypeEnum.DATA_EXTRACT, - "processDocumentsIndividually": True, - # REMOVED: maxSize, textChunkSize, imageChunkSize - "mergeStrategy": { - "useIntelligentMerging": True, - "prompt": prompt, - "groupBy": "typeGroup", - "orderBy": "id", - "mergeType": "concatenate" - }, - } + # Build extraction options using Pydantic model + mergeStrategy = MergeStrategy( + useIntelligentMerging=True, + prompt=prompt, + groupBy="typeGroup", + orderBy="id", + mergeType="concatenate" + ) - logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}") + extractionOptions = ExtractionOptions( + prompt=prompt, + operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT, + processDocumentsIndividually=True, + mergeStrategy=mergeStrategy + ) + + logger.debug(f"Per-chunk extraction options: prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}") try: # Extract content WITHOUT chunking @@ -120,21 +108,23 @@ class SubDocumentProcessing: if not documents: return {"metadata": {"title": "Empty Document"}, "sections": []} - # Build extraction options WITHOUT chunking parameters - extractionOptions: Dict[str, Any] = { - "prompt": prompt, - "operationType": options.operationType if options else OperationTypeEnum.DATA_EXTRACT, - "processDocumentsIndividually": True, - "mergeStrategy": { - "useIntelligentMerging": True, - "prompt": prompt, - "groupBy": "typeGroup", - "orderBy": "id", - "mergeType": "concatenate" - }, - } + # Build extraction options using Pydantic model + mergeStrategy = MergeStrategy( + useIntelligentMerging=True, + prompt=prompt, + groupBy="typeGroup", + orderBy="id", + mergeType="concatenate" + ) - logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}") + extractionOptions = ExtractionOptions( + prompt=prompt, + operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT, + processDocumentsIndividually=True, + mergeStrategy=mergeStrategy + ) + + logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}") try: # Extract content WITHOUT chunking @@ -205,31 +195,25 @@ class SubDocumentProcessing: if not documents: return {"metadata": {"title": "Empty Document"}, "sections": []} - # Get model capabilities for size calculation - model_capabilities = self._getModelCapabilitiesForContent(custom_prompt, documents, options) + # Build extraction options using Pydantic model (model-aware chunking in AI call phase) + mergeStrategy = MergeStrategy( + useIntelligentMerging=True, + prompt=custom_prompt, + groupBy="typeGroup", + orderBy="id", + mergeType="concatenate" + ) - # Build extraction options for chunking with intelligent merging - extractionOptions: Dict[str, Any] = { - "prompt": custom_prompt, # Use the custom prompt instead of default - "operationType": options.operationType if options else OperationTypeEnum.DATA_EXTRACT, - "processDocumentsIndividually": True, # Process each document separately - "maxSize": model_capabilities["maxContextBytes"], - "chunkAllowed": True, - "textChunkSize": model_capabilities["textChunkSize"], - "imageChunkSize": model_capabilities["imageChunkSize"], - "imageMaxPixels": 1024 * 1024, - "imageQuality": 85, - "mergeStrategy": { - "useIntelligentMerging": True, # Enable intelligent token-aware merging - "capabilities": model_capabilities, - "prompt": custom_prompt, # Use the custom prompt - "groupBy": "typeGroup", - "orderBy": "id", - "mergeType": "concatenate" - }, - } + extractionOptions = ExtractionOptions( + prompt=custom_prompt, # Use the custom prompt instead of default + operationType=options.operationType if options else OperationTypeEnum.DATA_EXTRACT, + processDocumentsIndividually=True, # Process each document separately + imageMaxPixels=1024 * 1024, + imageQuality=85, + mergeStrategy=mergeStrategy + ) - logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.get('prompt', ''))} chars, operationType={extractionOptions.get('operationType')}") + logger.debug(f"Per-chunk extraction options (JSON mode): prompt length={len(extractionOptions.prompt)} chars, operationType={extractionOptions.operationType}") try: # Extract content with chunking @@ -1042,15 +1026,13 @@ CONTINUATION INSTRUCTIONS: content_parts.append(content_part) # Use existing merging strategy from options - merge_strategy = { - "useIntelligentMerging": True, - "groupBy": "documentId", # Group by document - "orderBy": "partIndex", # Order by part index - "mergeType": "concatenate" - } + merge_strategy = MergeStrategy( + useIntelligentMerging=True, + groupBy="documentId", # Group by document + orderBy="partIndex", # Order by part index + mergeType="concatenate" + ) - if options and hasattr(options, 'mergeStrategy'): - merge_strategy.update(options.mergeStrategy) # Apply existing merging logic using the sophisticated merging system from modules.services.serviceExtraction.subPipeline import _applyMerging @@ -1095,15 +1077,13 @@ CONTINUATION INSTRUCTIONS: content_parts.append(content_part) # Use existing merging strategy for JSON mode - merge_strategy = { - "useIntelligentMerging": True, - "groupBy": "documentId", # Group by document - "orderBy": "partIndex", # Order by part index - "mergeType": "concatenate" - } + merge_strategy = MergeStrategy( + useIntelligentMerging=True, + groupBy="documentId", # Group by document + orderBy="partIndex", # Order by part index + mergeType="concatenate" + ) - if options and hasattr(options, 'mergeStrategy'): - merge_strategy.update(options.mergeStrategy) # Apply existing merging logic using the sophisticated merging system from modules.services.serviceExtraction.subPipeline import _applyMerging @@ -1234,15 +1214,13 @@ CONTINUATION INSTRUCTIONS: content_parts.append(content_part) # Use existing merging strategy from options - merge_strategy = { - "useIntelligentMerging": True, - "groupBy": "documentId", # Group by document - "orderBy": "chunkIndex", # Order by chunk index - "mergeType": "concatenate" - } + merge_strategy = MergeStrategy( + useIntelligentMerging=True, + groupBy="documentId", # Group by document + orderBy="chunkIndex", # Order by chunk index + mergeType="concatenate" + ) - if options and hasattr(options, 'mergeStrategy'): - merge_strategy.update(options.mergeStrategy) # Apply existing merging logic using the sophisticated merging system from modules.services.serviceExtraction.subPipeline import _applyMerging @@ -1297,15 +1275,13 @@ CONTINUATION INSTRUCTIONS: content_parts.append(content_part) # Use existing merging strategy for clean mode - merge_strategy = { - "useIntelligentMerging": True, - "groupBy": "documentId", # Group by document - "orderBy": "chunkIndex", # Order by chunk index - "mergeType": "concatenate" - } + merge_strategy = MergeStrategy( + useIntelligentMerging=True, + groupBy="documentId", # Group by document + orderBy="chunkIndex", # Order by chunk index + mergeType="concatenate" + ) - if options and hasattr(options, 'mergeStrategy'): - merge_strategy.update(options.mergeStrategy) # Apply existing merging logic using the sophisticated merging system from modules.services.serviceExtraction.subPipeline import _applyMerging @@ -1351,15 +1327,13 @@ CONTINUATION INSTRUCTIONS: content_parts.append(content_part) # Use existing merging strategy for JSON mode - merge_strategy = { - "useIntelligentMerging": True, - "groupBy": "documentId", # Group by document - "orderBy": "chunkIndex", # Order by chunk index - "mergeType": "concatenate" - } + merge_strategy = MergeStrategy( + useIntelligentMerging=True, + groupBy="documentId", # Group by document + orderBy="chunkIndex", # Order by chunk index + mergeType="concatenate" + ) - if options and hasattr(options, 'mergeStrategy'): - merge_strategy.update(options.mergeStrategy) # Apply existing merging logic using the sophisticated merging system from modules.services.serviceExtraction.subPipeline import _applyMerging @@ -1455,5 +1429,3 @@ CONTINUATION INSTRUCTIONS: logger.info(f"Merged {len(chunkResults)} chunks using existing sophisticated merging system (JSON mode)") return merged_document - -# REMOVED: _getModelCapabilitiesForContent method - no longer needed with model-aware chunking diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py index eef95638..62931565 100644 --- a/modules/services/serviceExtraction/mainServiceExtraction.py +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -5,7 +5,7 @@ import time from .subRegistry import ExtractorRegistry, ChunkerRegistry from .subPipeline import runExtraction -from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy +from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, MergeStrategy, ExtractionOptions from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelAi import AiCallResponse from modules.aicore.aicoreModelRegistry import modelRegistry @@ -20,7 +20,7 @@ class ExtractionService: self._extractorRegistry = ExtractorRegistry() self._chunkerRegistry = ChunkerRegistry() - def extractContent(self, documents: List[ChatDocument], options: Dict[str, Any]) -> List[ContentExtracted]: + def extractContent(self, documents: List[ChatDocument], options: ExtractionOptions) -> List[ContentExtracted]: """ Extract content from a list of ChatDocument objects. @@ -31,6 +31,7 @@ class ExtractionService: Returns: List of ContentExtracted objects, one per input document """ + results: List[ContentExtracted] = [] # Lazy import to avoid circular deps and heavy init at module import diff --git a/modules/services/serviceExtraction/merging/mergerDefault.py b/modules/services/serviceExtraction/merging/mergerDefault.py index ceab6635..9a7a625a 100644 --- a/modules/services/serviceExtraction/merging/mergerDefault.py +++ b/modules/services/serviceExtraction/merging/mergerDefault.py @@ -1,9 +1,9 @@ from typing import Any, Dict, List -from modules.datamodels.datamodelExtraction import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart, MergeStrategy class DefaultMerger: - def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + def merge(self, parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]: """ Default merger that passes through parts unchanged. Used for image, binary, metadata, container typeGroups. diff --git a/modules/services/serviceExtraction/merging/mergerTable.py b/modules/services/serviceExtraction/merging/mergerTable.py index 4f62358c..cffce8f1 100644 --- a/modules/services/serviceExtraction/merging/mergerTable.py +++ b/modules/services/serviceExtraction/merging/mergerTable.py @@ -1,10 +1,10 @@ from typing import Any, Dict, List -from modules.datamodels.datamodelExtraction import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart, MergeStrategy from ..subUtils import makeId class TableMerger: - def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + def merge(self, parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]: """ Merge table parts based on strategy. Strategy options: @@ -15,9 +15,9 @@ class TableMerger: if not parts: return parts - groupBy = strategy.get("groupBy", "parentId") - maxSize = strategy.get("maxSize", 0) - combineSheets = strategy.get("combineSheets", False) + groupBy = strategy.groupBy + maxSize = strategy.maxSize or 0 + combineSheets = strategy.tableMerge.get("combineSheets", False) if strategy.tableMerge else False # Group parts groups = self._groupParts(parts, groupBy, combineSheets) diff --git a/modules/services/serviceExtraction/merging/mergerText.py b/modules/services/serviceExtraction/merging/mergerText.py index 38f7c6f0..9e1ccf47 100644 --- a/modules/services/serviceExtraction/merging/mergerText.py +++ b/modules/services/serviceExtraction/merging/mergerText.py @@ -1,10 +1,10 @@ from typing import Any, Dict, List -from modules.datamodels.datamodelExtraction import ContentPart +from modules.datamodels.datamodelExtraction import ContentPart, MergeStrategy from ..subUtils import makeId class TextMerger: - def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + def merge(self, parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]: """ Merge text parts based on strategy. Strategy options: @@ -15,9 +15,9 @@ class TextMerger: if not parts: return parts - groupBy = strategy.get("groupBy", "parentId") - orderBy = strategy.get("orderBy", "label") - maxSize = strategy.get("maxSize", 0) + groupBy = strategy.groupBy + orderBy = strategy.orderBy + maxSize = strategy.maxSize or 0 # Group parts groups = self._groupParts(parts, groupBy) diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py index 9d8193da..e935f3c3 100644 --- a/modules/services/serviceExtraction/subPipeline.py +++ b/modules/services/serviceExtraction/subPipeline.py @@ -1,8 +1,7 @@ -from typing import Any, Dict, List +from typing import List import logging -import os -from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart +from modules.datamodels.datamodelExtraction import ContentExtracted, ContentPart, ExtractionOptions, MergeStrategy from .subUtils import makeId from .subRegistry import ExtractorRegistry, ChunkerRegistry from .merging.mergerText import TextMerger @@ -13,13 +12,13 @@ from .subMerger import IntelligentTokenAwareMerger logger = logging.getLogger(__name__) -def _mergeParts(parts: List[ContentPart], mergeStrategy: Dict[str, Any]) -> List[ContentPart]: +def _mergeParts(parts: List[ContentPart], mergeStrategy: MergeStrategy) -> List[ContentPart]: """Merge parts based on the provided strategy.""" if not parts or not mergeStrategy: return parts - groupBy = mergeStrategy.get("groupBy", "typeGroup") - orderBy = mergeStrategy.get("orderBy", "id") + groupBy = mergeStrategy.groupBy + orderBy = mergeStrategy.orderBy # Group parts by the specified field groups = {} @@ -56,7 +55,8 @@ def _mergeParts(parts: List[ContentPart], mergeStrategy: Dict[str, Any]) -> List return merged_parts -def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ContentExtracted: +def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: ExtractionOptions) -> ContentExtracted: + extractor = extractorRegistry.resolve(mimeType, fileName) if extractor is None: # fallback: single binary part @@ -71,15 +71,14 @@ def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: Chunker ) return ContentExtracted(id=makeId(), parts=[part]) - parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options}) + parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType}) # REMOVED: poolAndLimit(parts, chunkerRegistry, options) # REMOVED: Chunking logic - now handled in AI call phase # Apply merging strategy if provided (preserve existing logic) - mergeStrategy = options.get("mergeStrategy", {}) - if mergeStrategy: - parts = _applyMerging(parts, mergeStrategy) + if options.mergeStrategy: + parts = _applyMerging(parts, options.mergeStrategy) return ContentExtracted(id=makeId(), parts=parts) @@ -87,17 +86,17 @@ def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: Chunker # REMOVED: poolAndLimit function - chunking now handled in AI call phase -def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: +def _applyMerging(parts: List[ContentPart], strategy: MergeStrategy) -> List[ContentPart]: """Apply merging strategy to parts with intelligent token-aware merging.""" logger.debug(f"_applyMerging called with {len(parts)} parts") # Check if intelligent merging is enabled - if strategy.get("useIntelligentMerging", False): - model_capabilities = strategy.get("capabilities", {}) + if strategy.useIntelligentMerging: + model_capabilities = strategy.capabilities or {} subMerger = IntelligentTokenAwareMerger(model_capabilities) # Use intelligent merging for all parts - merged = subMerger.merge_chunks_intelligently(parts, strategy.get("prompt", "")) + merged = subMerger.merge_chunks_intelligently(parts, strategy.prompt or "") # Calculate and log optimization stats stats = subMerger.calculate_optimization_stats(parts, merged) diff --git a/test_ai_model_selection.py b/test_ai_model_selection.py index ea8a6798..62324cba 100644 --- a/test_ai_model_selection.py +++ b/test_ai_model_selection.py @@ -90,7 +90,7 @@ class ModelSelectionTester: totalScore = sizeRating + processingModeRating + priorityRating print( - f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}" + f" {idx:>2}. {m.displayName} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}" ) print(f" Size: {sizeRating:.3f}, ProcessingMode: {processingModeRating:.3f}, Priority: {priorityRating:.3f}") @@ -136,7 +136,7 @@ class ModelSelectionTester: totalScore = sizeRating + processingModeRating + priorityRating print( - f" {idx:>2}. {m.name} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}" + f" {idx:>2}. {m.displayName} | Q={getattr(m, 'qualityRating', 0)} | S={getattr(m, 'speedRating', 0)} | ${costIn:.4f} | ctx={getattr(m, 'contextLength', 0)} | score={totalScore:.3f}" ) print(f" Size: {sizeRating:.3f}, ProcessingMode: {processingModeRating:.3f}, Priority: {priorityRating:.3f}") @@ -365,8 +365,8 @@ class ModelSelectionTester: ) if failoverModelList: - print(f" Selected model: {failoverModelList[0].name}") - print(f" Fallback models: {[m.name for m in failoverModelList[1:3]]}") + print(f" Selected model: {failoverModelList[0].displayName}") + print(f" Fallback models: {[m.displayName for m in failoverModelList[1:3]]}") else: print(" No suitable models found") @@ -393,8 +393,8 @@ class ModelSelectionTester: ) if failoverModelList: - print(f" Selected model: {failoverModelList[0].name}") - print(f" Fallback models: {[m.name for m in failoverModelList[1:3]]}") + print(f" Selected model: {failoverModelList[0].displayName}") + print(f" Fallback models: {[m.displayName for m in failoverModelList[1:3]]}") else: print(" No suitable models found") @@ -421,8 +421,8 @@ class ModelSelectionTester: ) if failoverModelList: - print(f" Selected model: {failoverModelList[0].name}") - print(f" Fallback models: {[m.name for m in failoverModelList[1:3]]}") + print(f" Selected model: {failoverModelList[0].displayName}") + print(f" Fallback models: {[m.displayName for m in failoverModelList[1:3]]}") else: print(" No suitable models found") @@ -449,8 +449,8 @@ class ModelSelectionTester: ) if failoverModelList: - print(f" Selected model: {failoverModelList[0].name}") - print(f" Fallback models: {[m.name for m in failoverModelList[1:3]]}") + print(f" Selected model: {failoverModelList[0].displayName}") + print(f" Fallback models: {[m.displayName for m in failoverModelList[1:3]]}") else: print(" No suitable models found") @@ -479,15 +479,15 @@ class ModelSelectionTester: print(f" {connector_type}: {len(models)} models") for model in models: capabilities = getattr(model, 'capabilities', []) - print(f" - {model.name}: {capabilities}") + print(f" - {model.displayName}: {capabilities}") # Show operation type support print(f"\nOperation type support:") for op_type in OperationTypeEnum: - supported_models = [m for m in availableModels if hasattr(m, 'operationTypes') and op_type in m.operationTypes] + supported_models = [m for m in availableModels if hasattr(m, 'operationTypes') and any(ot.operationType == op_type for ot in m.operationTypes)] print(f" {op_type.name}: {len(supported_models)} models") if supported_models: - model_names = [m.name for m in supported_models[:3]] # Show first 3 models + model_names = [m.displayName for m in supported_models[:3]] # Show first 3 models print(f" Models: {', '.join(model_names)}")