596 lines
27 KiB
Python
596 lines
27 KiB
Python
import logging
|
|
from typing import Dict, Any, List, Optional, Tuple, Union
|
|
from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
|
|
from modules.interfaces.interfaceAiObjects import AiObjects
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SubCoreAi:
|
|
"""Core AI operations including image analysis, text generation, and planning calls."""
|
|
|
|
def __init__(self, services, aiObjects):
|
|
"""Initialize core AI operations.
|
|
|
|
Args:
|
|
services: Service center instance for accessing other services
|
|
aiObjects: Initialized AiObjects instance
|
|
"""
|
|
self.services = services
|
|
self.aiObjects = aiObjects
|
|
|
|
# AI Processing Call
|
|
async def callAi(
|
|
self,
|
|
prompt: str,
|
|
documents: Optional[List[ChatDocument]] = None,
|
|
placeholders: Optional[List[PromptPlaceholder]] = None,
|
|
options: Optional[AiCallOptions] = None,
|
|
outputFormat: Optional[str] = None,
|
|
title: Optional[str] = None,
|
|
documentProcessor=None,
|
|
documentGenerator=None
|
|
) -> Union[str, Dict[str, Any]]:
|
|
"""
|
|
Unified AI call interface that automatically routes to appropriate handler.
|
|
|
|
Args:
|
|
prompt: The main prompt for the AI call
|
|
documents: Optional list of documents to process
|
|
placeholders: Optional list of placeholder replacements for planning calls
|
|
options: AI call configuration options
|
|
outputFormat: Optional output format (html, pdf, docx, txt, md, json, csv, xlsx) for document generation
|
|
title: Optional title for generated documents
|
|
documentProcessor: Document processing service instance
|
|
documentGenerator: Document generation service instance
|
|
|
|
Returns:
|
|
AI response as string, or dict with documents if outputFormat is specified
|
|
|
|
Raises:
|
|
Exception: If all available models fail
|
|
"""
|
|
if options is None:
|
|
options = AiCallOptions()
|
|
|
|
# Normalize placeholders from List[PromptPlaceholder]
|
|
placeholders_dict: Dict[str, str] = {}
|
|
placeholders_meta: Dict[str, bool] = {}
|
|
if placeholders:
|
|
placeholders_dict = {p.label: p.content for p in placeholders}
|
|
placeholders_meta = {p.label: bool(getattr(p, 'summaryAllowed', False)) for p in placeholders}
|
|
|
|
# Auto-determine call type based on documents and operation type
|
|
call_type = self._determineCallType(documents, options.operationType)
|
|
options.callType = call_type
|
|
|
|
try:
|
|
# Build the full prompt that will be sent to AI
|
|
if placeholders:
|
|
full_prompt = prompt
|
|
for p in placeholders:
|
|
placeholder = f"{{{{KEY:{p.label}}}}}"
|
|
full_prompt = full_prompt.replace(placeholder, p.content)
|
|
else:
|
|
full_prompt = prompt
|
|
|
|
self._writeAiResponseDebug(
|
|
label='ai_prompt_debug',
|
|
content=full_prompt,
|
|
partIndex=1,
|
|
modelName=None,
|
|
continuation=False
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Handle document generation with specific output format
|
|
if outputFormat and documentGenerator:
|
|
result = await documentGenerator.callAiWithDocumentGeneration(prompt, documents, options, outputFormat, title)
|
|
# Log AI response for debugging
|
|
try:
|
|
if isinstance(result, dict) and 'content' in result:
|
|
self._writeAiResponseDebug(
|
|
label='ai_document_generation',
|
|
content=result['content'],
|
|
partIndex=1,
|
|
modelName=None, # Document generation doesn't return model info
|
|
continuation=False
|
|
)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
if call_type == "planning":
|
|
result = await self._callAiPlanning(prompt, placeholders_dict, placeholders_meta, options)
|
|
# Log AI response for debugging
|
|
try:
|
|
self._writeAiResponseDebug(
|
|
label='ai_planning',
|
|
content=result or "",
|
|
partIndex=1,
|
|
modelName=None, # Planning doesn't return model info
|
|
continuation=False
|
|
)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
else:
|
|
# Set processDocumentsIndividually from the legacy parameter if not set in options
|
|
if options.processDocumentsIndividually is None and documents:
|
|
options.processDocumentsIndividually = False # Default to batch processing
|
|
|
|
# For text calls, we need to build the full prompt with placeholders here
|
|
# since _callAiText doesn't handle placeholders directly
|
|
if placeholders_dict:
|
|
full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders_dict)
|
|
else:
|
|
full_prompt = prompt
|
|
|
|
if documentProcessor:
|
|
result = await documentProcessor.callAiText(full_prompt, documents, options)
|
|
else:
|
|
# Fallback to direct AI call if no document processor available
|
|
request = AiCallRequest(
|
|
prompt=full_prompt,
|
|
context="",
|
|
options=options
|
|
)
|
|
response = await self.aiObjects.call(request)
|
|
result = response.content
|
|
|
|
# Log AI response for debugging (additional logging for text calls)
|
|
try:
|
|
self._writeAiResponseDebug(
|
|
label='ai_text_main',
|
|
content=result or "",
|
|
partIndex=1,
|
|
modelName=None, # Text calls already log internally
|
|
continuation=False
|
|
)
|
|
except Exception:
|
|
pass
|
|
return result
|
|
|
|
# AI Image Analysis
|
|
async def readImage(
|
|
self,
|
|
prompt: str,
|
|
imageData: Union[str, bytes],
|
|
mimeType: str = None,
|
|
options: Optional[AiCallOptions] = None,
|
|
) -> str:
|
|
"""Call AI for image analysis using interface.callImage()."""
|
|
try:
|
|
# Check if imageData is valid
|
|
if not imageData:
|
|
error_msg = "No image data provided"
|
|
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
|
|
logger.error(f"Error in AI image analysis: {error_msg}")
|
|
return f"Error: {error_msg}"
|
|
|
|
self.services.utils.debugLogToFile(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}", "AI_SERVICE")
|
|
logger.info(f"readImage called with prompt, imageData type: {type(imageData)}, length: {len(imageData) if imageData else 0}, mimeType: {mimeType}")
|
|
|
|
# Always use IMAGE_ANALYSIS operation type for image processing
|
|
if options is None:
|
|
options = AiCallOptions(operationType=OperationType.IMAGE_ANALYSIS)
|
|
else:
|
|
# Override the operation type to ensure image analysis
|
|
options.operationType = OperationType.IMAGE_ANALYSIS
|
|
|
|
self.services.utils.debugLogToFile(f"Calling aiObjects.callImage with operationType: {options.operationType}", "AI_SERVICE")
|
|
logger.info(f"Calling aiObjects.callImage with operationType: {options.operationType}")
|
|
result = await self.aiObjects.callImage(prompt, imageData, mimeType, options)
|
|
|
|
# Debug the result
|
|
self.services.utils.debugLogToFile(f"Raw AI result type: {type(result)}, value: {repr(result)}", "AI_SERVICE")
|
|
|
|
# Check if result is valid
|
|
if not result or (isinstance(result, str) and not result.strip()):
|
|
error_msg = f"No response from AI image analysis (result: {repr(result)})"
|
|
self.services.utils.debugLogToFile(f"Error in AI image analysis: {error_msg}", "AI_SERVICE")
|
|
logger.error(f"Error in AI image analysis: {error_msg}")
|
|
return f"Error: {error_msg}"
|
|
|
|
self.services.utils.debugLogToFile(f"callImage returned: {result[:200]}..." if len(result) > 200 else result, "AI_SERVICE")
|
|
logger.info(f"callImage returned: {result[:200]}..." if len(result) > 200 else result)
|
|
return result
|
|
except Exception as e:
|
|
self.services.utils.debugLogToFile(f"Error in AI image analysis: {str(e)}", "AI_SERVICE")
|
|
logger.error(f"Error in AI image analysis: {str(e)}")
|
|
return f"Error: {str(e)}"
|
|
|
|
# AI Image Generation
|
|
async def generateImage(
|
|
self,
|
|
prompt: str,
|
|
size: str = "1024x1024",
|
|
quality: str = "standard",
|
|
style: str = "vivid",
|
|
options: Optional[AiCallOptions] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Generate an image using AI using interface.generateImage()."""
|
|
try:
|
|
return await self.aiObjects.generateImage(prompt, size, quality, style, options)
|
|
except Exception as e:
|
|
logger.error(f"Error in AI image generation: {str(e)}")
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str:
|
|
"""
|
|
Determine call type based on documents and operation type.
|
|
|
|
Criteria: no documents AND operationType is "generate_plan" -> planning
|
|
All other cases -> text
|
|
"""
|
|
has_documents = documents is not None and len(documents) > 0
|
|
is_planning_operation = operation_type == OperationType.GENERATE_PLAN
|
|
|
|
if not has_documents and is_planning_operation:
|
|
return "planning"
|
|
else:
|
|
return "text"
|
|
|
|
async def _callAiPlanning(
|
|
self,
|
|
prompt: str,
|
|
placeholders: Optional[Dict[str, str]],
|
|
placeholdersMeta: Optional[Dict[str, bool]],
|
|
options: AiCallOptions
|
|
) -> str:
|
|
"""
|
|
Handle planning calls with placeholder system and selective summarization.
|
|
"""
|
|
# Build full prompt with placeholders; if too large, summarize summaryAllowed placeholders proportionally
|
|
effective_placeholders = placeholders or {}
|
|
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
|
|
|
|
if options.compressPrompt and placeholdersMeta:
|
|
# Determine model capacity
|
|
try:
|
|
caps = self._getModelCapabilitiesForContent(full_prompt, None, options)
|
|
max_bytes = caps.get("maxContextBytes", len(full_prompt.encode("utf-8")))
|
|
except Exception:
|
|
max_bytes = len(full_prompt.encode("utf-8"))
|
|
|
|
current_bytes = len(full_prompt.encode("utf-8"))
|
|
if current_bytes > max_bytes:
|
|
# Compute total bytes contributed by allowed placeholders (approximate by content length)
|
|
allowed_labels = [l for l, allow in placeholdersMeta.items() if allow]
|
|
allowed_sizes = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels}
|
|
total_allowed = sum(allowed_sizes.values())
|
|
|
|
overage = current_bytes - max_bytes
|
|
if total_allowed > 0 and overage > 0:
|
|
# Target total for allowed after reduction
|
|
target_allowed = max(total_allowed - overage, 0)
|
|
# Global ratio to apply across allowed placeholders
|
|
ratio = target_allowed / total_allowed if total_allowed > 0 else 1.0
|
|
ratio = max(0.0, min(1.0, ratio))
|
|
|
|
reduced: Dict[str, str] = {}
|
|
for label, content in effective_placeholders.items():
|
|
if label in allowed_labels and isinstance(content, str) and len(content) > 0:
|
|
old_len = len(content)
|
|
# Reduce by proportional ratio on characters (fallback if empty)
|
|
reduction_factor = ratio if old_len > 0 else 1.0
|
|
reduced[label] = self._reduceText(content, reduction_factor)
|
|
else:
|
|
reduced[label] = content
|
|
|
|
effective_placeholders = reduced
|
|
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
|
|
|
|
# If still slightly over, perform a second-pass fine adjustment with updated ratio
|
|
current_bytes = len(full_prompt.encode("utf-8"))
|
|
if current_bytes > max_bytes and total_allowed > 0:
|
|
overage2 = current_bytes - max_bytes
|
|
# Recompute allowed sizes after first reduction
|
|
allowed_sizes2 = {l: len((effective_placeholders.get(l) or "").encode("utf-8")) for l in allowed_labels}
|
|
total_allowed2 = sum(allowed_sizes2.values())
|
|
if total_allowed2 > 0 and overage2 > 0:
|
|
target_allowed2 = max(total_allowed2 - overage2, 0)
|
|
ratio2 = target_allowed2 / total_allowed2
|
|
ratio2 = max(0.0, min(1.0, ratio2))
|
|
reduced2: Dict[str, str] = {}
|
|
for label, content in effective_placeholders.items():
|
|
if label in allowed_labels and isinstance(content, str) and len(content) > 0:
|
|
old_len = len(content)
|
|
reduction_factor = ratio2 if old_len > 0 else 1.0
|
|
reduced2[label] = self._reduceText(content, reduction_factor)
|
|
else:
|
|
reduced2[label] = content
|
|
effective_placeholders = reduced2
|
|
full_prompt = self._buildPromptWithPlaceholders(prompt, effective_placeholders)
|
|
|
|
|
|
# Make AI call using AiObjects (let it handle model selection)
|
|
request = AiCallRequest(
|
|
prompt=full_prompt,
|
|
context="", # Context is already included in the prompt
|
|
options=options
|
|
)
|
|
response = await self.aiObjects.call(request)
|
|
try:
|
|
logger.debug(f"AI model selected (planning): {getattr(response, 'modelName', 'unknown')}")
|
|
except Exception:
|
|
pass
|
|
return response.content
|
|
|
|
async def _callAiDirect(
|
|
self,
|
|
prompt: str,
|
|
documents: Optional[List[ChatDocument]],
|
|
options: AiCallOptions,
|
|
documentProcessor=None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Call AI directly with prompt and documents for JSON output.
|
|
Used for multi-file generation - uses the existing generation pipeline.
|
|
"""
|
|
# Use the existing generation pipeline that already works
|
|
# This ensures proper document processing and content extraction
|
|
logger.info(f"Using existing generation pipeline for {len(documents) if documents else 0} documents")
|
|
|
|
if documentProcessor:
|
|
# Process documents with JSON merging using the existing pipeline
|
|
result = await documentProcessor.processDocumentsPerChunkJson(documents, prompt, options)
|
|
else:
|
|
# Fallback to simple AI call
|
|
request = AiCallRequest(
|
|
prompt=prompt,
|
|
context="",
|
|
options=options
|
|
)
|
|
response = await self.aiObjects.call(request)
|
|
result = {"metadata": {"title": "AI Response"}, "sections": [{"id": "section_1", "type": "paragraph", "data": {"text": response.content}}]}
|
|
|
|
# Convert single-file result to multi-file format if needed
|
|
if "sections" in result and "documents" not in result:
|
|
logger.info("Converting single-file result to multi-file format")
|
|
# This is a single-file result, convert it to multi-file format
|
|
return {
|
|
"metadata": result.get("metadata", {"title": "Converted Document"}),
|
|
"documents": [{
|
|
"id": "doc_1",
|
|
"title": result.get("metadata", {}).get("title", "Document"),
|
|
"filename": "document.txt",
|
|
"sections": result.get("sections", [])
|
|
}]
|
|
}
|
|
|
|
return result
|
|
|
|
def _getModelCapabilitiesForContent(self, prompt: str, documents: Optional[List[ChatDocument]], options: AiCallOptions) -> Dict[str, int]:
|
|
"""
|
|
Get model capabilities for content processing, including appropriate size limits for chunking.
|
|
"""
|
|
# Estimate total content size
|
|
prompt_size = len(prompt.encode('utf-8'))
|
|
document_size = 0
|
|
if documents:
|
|
# Rough estimate of document content size
|
|
for doc in documents:
|
|
document_size += doc.fileSize or 0
|
|
|
|
total_size = prompt_size + document_size
|
|
|
|
# Use AiObjects to select the best model for this content size
|
|
# We'll simulate the model selection by checking available models
|
|
from modules.interfaces.interfaceAiObjects import aiModels
|
|
|
|
# Find the best model for this content size and operation
|
|
best_model = None
|
|
best_context_length = 0
|
|
|
|
for model_name, model_info in aiModels.items():
|
|
context_length = model_info.get("contextLength", 0)
|
|
|
|
# Skip models with no context length or too small for content
|
|
if context_length == 0:
|
|
continue
|
|
|
|
# Check if model supports the operation type
|
|
capabilities = model_info.get("capabilities", [])
|
|
if options.operationType == OperationType.IMAGE_ANALYSIS and "image_analysis" not in capabilities:
|
|
continue
|
|
elif options.operationType == OperationType.IMAGE_GENERATION and "image_generation" not in capabilities:
|
|
continue
|
|
elif options.operationType == OperationType.WEB_RESEARCH and "web_search" not in capabilities:
|
|
continue
|
|
elif "text_generation" not in capabilities:
|
|
continue
|
|
|
|
# Prefer models that can handle the content without chunking, but allow chunking if needed
|
|
if context_length >= total_size * 0.8: # 80% of content size
|
|
if context_length > best_context_length:
|
|
best_model = model_info
|
|
best_context_length = context_length
|
|
elif best_model is None: # Fallback to largest available model
|
|
if context_length > best_context_length:
|
|
best_model = model_info
|
|
best_context_length = context_length
|
|
|
|
# Fallback to a reasonable default if no model found
|
|
if best_model is None:
|
|
best_model = {
|
|
"contextLength": 128000, # GPT-4o default
|
|
"llmName": "gpt-4o"
|
|
}
|
|
|
|
# Calculate appropriate sizes
|
|
# Convert tokens to bytes (rough estimate: 1 token ≈ 4 characters)
|
|
context_length_bytes = int(best_model["contextLength"] * 4)
|
|
max_context_bytes = int(context_length_bytes * 0.9) # 90% of context length
|
|
text_chunk_size = int(max_context_bytes * 0.7) # 70% of max context for text chunks
|
|
image_chunk_size = int(max_context_bytes * 0.8) # 80% of max context for image chunks
|
|
|
|
logger.debug(f"Selected model: {best_model.get('llmName', 'unknown')} with context length: {best_model['contextLength']}")
|
|
logger.debug(f"Content size: {total_size} bytes, Max context: {max_context_bytes} bytes")
|
|
logger.debug(f"Text chunk size: {text_chunk_size} bytes, Image chunk size: {image_chunk_size} bytes")
|
|
|
|
return {
|
|
"maxContextBytes": max_context_bytes,
|
|
"textChunkSize": text_chunk_size,
|
|
"imageChunkSize": image_chunk_size
|
|
}
|
|
|
|
def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
|
|
"""
|
|
Get models capable of handling the specific operation with capability filtering.
|
|
"""
|
|
# Use the actual AI objects model selection instead of hardcoded default
|
|
if hasattr(self, 'aiObjects') and self.aiObjects:
|
|
# Let AiObjects handle the model selection
|
|
return []
|
|
else:
|
|
# Fallback to default model if AiObjects not available
|
|
default_model = ModelCapabilities(
|
|
name="default",
|
|
maxTokens=4000,
|
|
capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
|
|
costPerToken=0.001,
|
|
processingTime=1.0,
|
|
isAvailable=True
|
|
)
|
|
return [default_model]
|
|
|
|
def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
|
|
"""
|
|
Build full prompt by replacing placeholders with their content.
|
|
Uses the new {{KEY:placeholder}} format.
|
|
"""
|
|
if not placeholders:
|
|
return prompt
|
|
|
|
full_prompt = prompt
|
|
for placeholder, content in placeholders.items():
|
|
# Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
|
|
full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
|
|
full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
|
|
|
|
return full_prompt
|
|
|
|
def _writeAiResponseDebug(self, label: str, content: str, partIndex: int = 1, modelName: str = None, continuation: bool = None) -> None:
|
|
"""Persist raw AI response parts for debugging under test-chat/ai - only if debug enabled."""
|
|
try:
|
|
# Check if debug logging is enabled
|
|
debug_enabled = self.services.utils.configGet("APP_DEBUG_CHAT_WORKFLOW_ENABLED", False)
|
|
if not debug_enabled:
|
|
return
|
|
|
|
import os
|
|
from datetime import datetime, UTC
|
|
# Base dir: gateway/test-chat/ai (go up 4 levels from this file)
|
|
# .../gateway/modules/services/serviceAi/subCoreAi.py -> up to gateway root
|
|
gatewayDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
outDir = os.path.join(gatewayDir, 'test-chat', 'ai')
|
|
os.makedirs(outDir, exist_ok=True)
|
|
ts = datetime.now(UTC).strftime('%Y%m%d-%H%M%S-%f')[:-3]
|
|
suffix = []
|
|
if partIndex is not None:
|
|
suffix.append(f"part{partIndex}")
|
|
if continuation is not None:
|
|
suffix.append(f"cont_{str(continuation).lower()}")
|
|
if modelName:
|
|
safeModel = ''.join(c if c.isalnum() or c in ('-', '_') else '-' for c in modelName)
|
|
suffix.append(safeModel)
|
|
suffixStr = ('_' + '_'.join(suffix)) if suffix else ''
|
|
fname = f"{ts}_{label}{suffixStr}.txt"
|
|
fpath = os.path.join(outDir, fname)
|
|
with open(fpath, 'w', encoding='utf-8') as f:
|
|
f.write(content or '')
|
|
except Exception:
|
|
# Do not raise; best-effort debug write
|
|
pass
|
|
|
|
def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
|
|
"""
|
|
Check if text exceeds model token limit with safety margin.
|
|
"""
|
|
# Simple character-based estimation (4 chars per token)
|
|
estimated_tokens = len(text) // 4
|
|
max_tokens = int(model.maxTokens * (1 - safety_margin))
|
|
return estimated_tokens > max_tokens
|
|
|
|
def _reducePlanningPrompt(
|
|
self,
|
|
full_prompt: str,
|
|
placeholders: Optional[Dict[str, str]],
|
|
model: ModelCapabilities,
|
|
options: AiCallOptions
|
|
) -> str:
|
|
"""
|
|
Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
|
|
"""
|
|
if not placeholders:
|
|
return self._reduceText(full_prompt, 0.7)
|
|
|
|
# Reduce placeholders while preserving prompt
|
|
reduced_placeholders = {}
|
|
for placeholder, content in placeholders.items():
|
|
if len(content) > 1000: # Only reduce long content
|
|
reduction_factor = 0.7
|
|
reduced_content = self._reduceText(content, reduction_factor)
|
|
reduced_placeholders[placeholder] = reduced_content
|
|
else:
|
|
reduced_placeholders[placeholder] = content
|
|
|
|
return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
|
|
|
|
def _reduceTextPrompt(
|
|
self,
|
|
prompt: str,
|
|
context: str,
|
|
model: ModelCapabilities,
|
|
options: AiCallOptions
|
|
) -> str:
|
|
"""
|
|
Reduce text prompt size using typeGroup-aware chunking and merging.
|
|
"""
|
|
max_size = int(model.maxTokens * (1 - options.safetyMargin))
|
|
|
|
if options.compressPrompt:
|
|
# Reduce both prompt and context
|
|
target_size = max_size
|
|
current_size = len(prompt) + len(context)
|
|
reduction_factor = (target_size * 0.7) / current_size
|
|
|
|
if reduction_factor < 1.0:
|
|
prompt = self._reduceText(prompt, reduction_factor)
|
|
context = self._reduceText(context, reduction_factor)
|
|
else:
|
|
# Only reduce context, preserve prompt integrity
|
|
max_context_size = max_size - len(prompt)
|
|
if len(context) > max_context_size:
|
|
reduction_factor = max_context_size / len(context)
|
|
context = self._reduceText(context, reduction_factor)
|
|
|
|
return prompt + "\n\n" + context if context else prompt
|
|
|
|
def _extractTextFromContentParts(self, extracted_content) -> str:
|
|
"""
|
|
Extract text content from ExtractionService ContentPart objects.
|
|
"""
|
|
if not extracted_content or not hasattr(extracted_content, 'parts'):
|
|
return ""
|
|
|
|
text_parts = []
|
|
for part in extracted_content.parts:
|
|
if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
|
|
if hasattr(part, 'data') and part.data:
|
|
text_parts.append(part.data)
|
|
|
|
return "\n\n".join(text_parts)
|
|
|
|
def _reduceText(self, text: str, reduction_factor: float) -> str:
|
|
"""
|
|
Reduce text size by the specified factor.
|
|
"""
|
|
if reduction_factor >= 1.0:
|
|
return text
|
|
|
|
target_length = int(len(text) * reduction_factor)
|
|
return text[:target_length] + "... [reduced]"
|