enhanced ai and extraction engine

This commit is contained in:
ValueOn AG 2025-09-30 00:03:10 +02:00
parent 21f9e60af1
commit d20ab7ab0e
2 changed files with 616 additions and 0 deletions

View file

@ -0,0 +1,388 @@
# Dynamic Generic AI Calls Implementation Strategy
## Overview
This document outlines the implementation strategy for a robust, model-agnostic AI service that handles both planning and text processing calls with intelligent fallbacks, size management, and integration with the existing ExtractionService architecture.
## Core Principles
1. **Model Fallback Strategy**: Iterative model trying for maximum reliability
2. **Size Management**: 90% token limit with configurable safety margins
3. **Separation of Concerns**: Planning vs. text calls have different parameter sets and processing logic
4. **TypeGroup-Aware Processing**: Leverage existing chunking and merging logic based on content types
5. **Capability-Based Model Selection**: Only use models capable of handling specific operations
## Call Type Distinction
### Planning Calls
**Criteria**: `no documents AND (operationType in ["generate_plan", "analyse_content"])`
**Characteristics**:
- Use placeholder system for selective content summarization
- Prompt integrity is critical (can be protected with `compressPrompt=False`)
- Placeholders can be summarized while preserving prompt structure
- Examples: Task planning, action definition, validation, decision making
### Text Calls
**Criteria**: `has documents OR operationType not in ["generate_plan", "analyse_content"]`
**Characteristics**:
- Process documents through ExtractionService
- Use typeGroup-aware chunking and merging
- Can process documents individually or as a group
- Examples: Document analysis, content generation, format conversion
## Enhanced Data Models
### AiCallOptions
```python
class AiCallOptions(BaseModel):
# Existing fields
operationType: OperationType
priority: Priority = Priority.BALANCED
compressPrompt: bool = True
compressContext: bool = True
processDocumentsIndividually: bool = False
maxContextBytes: Optional[int] = None
# New fields for dynamic strategy
callType: Literal["planning", "text"] = Field(default_factory=lambda: "text")
safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5)
modelCapabilities: Optional[List[str]] = None # e.g., ["text", "image", "vision"]
```
### Model Capabilities
```python
class ModelCapabilities(BaseModel):
name: str
maxTokens: int
capabilities: List[str] # ["text", "image", "vision", "reasoning", "analysis"]
costPerToken: float
processingTime: float
isAvailable: bool = True
```
## Implementation Architecture
### 1. Unified AI Call Interface
```python
async def callAi(
self,
prompt: str,
documents: Optional[List[ChatDocument]] = None,
placeholders: Optional[Dict[str, str]] = None,
options: AiCallOptions
) -> str:
"""
Unified AI call interface that automatically routes to appropriate handler.
Args:
prompt: The main prompt for the AI call
documents: Optional list of documents to process
placeholders: Optional dictionary of placeholder replacements for planning calls
options: AI call configuration options
Returns:
AI response as string
Raises:
Exception: If all available models fail
"""
# Auto-determine call type based on documents and operation type
call_type = self._determineCallType(documents, options.operationType)
if call_type == "planning":
return await self._callAiPlanning(prompt, placeholders, options)
else:
return await self._callAiText(prompt, documents, options)
```
### 2. Planning Call Implementation
```python
async def _callAiPlanning(
self,
prompt: str,
placeholders: Optional[Dict[str, str]],
options: AiCallOptions
) -> str:
"""
Handle planning calls with placeholder system and selective summarization.
Process:
1. Get models capable of planning operations
2. Build full prompt with placeholders
3. Check token limits and reduce if needed
4. Try each model until one succeeds
"""
# Get available models for planning (text + reasoning capabilities)
models = self._getModelsForOperation("planning", options)
for model in models:
try:
# Build full prompt with placeholders
full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders)
# Check size and reduce if needed
if self._exceedsTokenLimit(full_prompt, model, options.safetyMargin):
full_prompt = self._reducePlanningPrompt(full_prompt, placeholders, model, options)
# Make AI call
result = await self._callModel(model, full_prompt, options)
return result
except Exception as e:
logger.warning(f"Planning model {model.name} failed: {e}")
continue
raise Exception("All planning models failed - check model availability and capabilities")
```
### 3. Text Call Implementation
```python
async def _callAiText(
self,
prompt: str,
documents: Optional[List[ChatDocument]],
options: AiCallOptions
) -> str:
"""
Handle text calls with document processing through ExtractionService.
Process:
1. Get models capable of text operations
2. Extract and process documents using ExtractionService
3. Check token limits and reduce if needed
4. Try each model until one succeeds
"""
# Get available models for text processing
models = self._getModelsForOperation("text", options)
for model in models:
try:
# Extract and process documents using ExtractionService
context = ""
if documents:
extracted_content = await self.extractionService.extractDocuments(
documentList=[{
"id": d.id,
"bytes": d.fileData,
"fileName": d.fileName,
"mimeType": d.mimeType
} for d in documents],
options={
"prompt": prompt,
"operationType": options.operationType.value,
"processDocumentsIndividually": options.processDocumentsIndividually,
"maxSize": options.maxContextBytes or int(model.maxTokens * 0.9),
"chunkAllowed": not options.compressContext,
"mergeStrategy": {"groupBy": "typeGroup"}
}
)
# Get text content from extracted parts using typeGroup-aware processing
context = self._extractTextFromContentParts(extracted_content)
# Check size and reduce if needed
full_prompt = prompt + "\n\n" + context if context else prompt
if self._exceedsTokenLimit(full_prompt, model, options.safetyMargin):
full_prompt = self._reduceTextPrompt(prompt, context, model, options)
# Make AI call
result = await self._callModel(model, full_prompt, options)
return result
except Exception as e:
logger.warning(f"Text model {model.name} failed: {e}")
continue
raise Exception("All text models failed - check model availability and capabilities")
```
### 4. Model Selection Strategy
```python
def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[Model]:
"""
Get models capable of handling the specific operation with capability filtering.
Args:
operation_type: "planning" or "text"
options: AI call options including required capabilities
Returns:
List of models sorted by priority and capability match
"""
all_models = self._getAvailableModels()
# Filter by operation type capabilities
if operation_type == "planning":
capable_models = [m for m in all_models
if "text" in m.capabilities and "reasoning" in m.capabilities]
elif operation_type == "text":
capable_models = [m for m in all_models if "text" in m.capabilities]
else:
capable_models = all_models
# Filter by specific capabilities if requested
if options.modelCapabilities:
capable_models = [m for m in capable_models
if all(cap in m.capabilities for cap in options.modelCapabilities)]
# Sort by priority preference (quality > balanced > speed > cost)
return self._sortModelsByPriority(capable_models, options.priority)
```
### 5. Size Management with TypeGroup-Aware Chunking
```python
def _reduceTextPrompt(
self,
prompt: str,
context: str,
model: Model,
options: AiCallOptions
) -> str:
"""
Reduce text prompt size using typeGroup-aware chunking and merging.
Args:
prompt: Original prompt
context: Extracted document context
model: Target model with token limits
options: AI call options
Returns:
Reduced prompt that fits within token limits
"""
max_size = int(model.maxTokens * (1 - options.safetyMargin))
if options.compressPrompt:
# Reduce both prompt and context
target_size = max_size
current_size = len(prompt) + len(context)
reduction_factor = (target_size * 0.7) / current_size
if reduction_factor < 1.0:
prompt = self._reduceText(prompt, reduction_factor)
context = self._reduceTextWithTypeGroups(context, reduction_factor, options)
else:
# Only reduce context, preserve prompt integrity
max_context_size = max_size - len(prompt)
if len(context) > max_context_size:
reduction_factor = max_context_size / len(context)
context = self._reduceTextWithTypeGroups(context, reduction_factor, options)
return prompt + "\n\n" + context if context else prompt
def _reduceTextWithTypeGroups(
self,
context: str,
reduction_factor: float,
options: AiCallOptions
) -> str:
"""
Reduce text using typeGroup-aware chunking and merging strategies.
Leverages existing chunking/merging modules:
- text_chunker.py / text_merger.py
- table_chunker.py / table_merger.py
- structure_chunker.py / default_merger.py
"""
if options.compressContext:
# Summarize content using AI
return await self._summarizeContent(context, reduction_factor)
else:
# Chunk content using typeGroup-aware chunkers
return await self._chunkContent(context, reduction_factor, options)
```
## Integration Points
### 1. ExtractionService Integration
- Use `extractionService.extractDocuments()` for all document processing
- Leverage existing 3-pass pipeline (Extract → Chunk → Merge)
- Utilize typeGroup-based processing for different content types
### 2. Existing Chunking/Merging Logic
- **Text Content**: `text_chunker.py` / `text_merger.py`
- **Table Content**: `table_chunker.py` / `table_merger.py`
- **Structured Content**: `structure_chunker.py` / `default_merger.py`
### 3. Model Capability Management
- Maintain model capability registry
- Filter models based on operation requirements
- Support dynamic model availability
## Error Handling Strategy
### Model Failure Handling
1. **Individual Model Failure**: Log warning, try next model
2. **All Models Failed**: Return error with diagnostic information including:
- List of attempted models
- Failure reasons for each model
- Suggested alternatives or parameter adjustments
### Size Management Failures
1. **Token Limit Exceeded**: Apply reduction strategies
2. **Reduction Failed**: Fall back to emergency chunking
3. **Critical Content Lost**: Return error with size analysis
## Configuration and Tuning
### Safety Margins
- **Default**: 10% safety margin (0.1)
- **Configurable**: Per-call basis via `AiCallOptions.safetyMargin`
- **Range**: 0.0 to 0.5 (0% to 50% safety margin)
### Model Selection Priority
1. **Quality**: Best model for accuracy
2. **Balanced**: Good balance of speed and quality
3. **Speed**: Fastest available model
4. **Cost**: Most cost-effective model
### Size Reduction Strategies
- **Prompt Compression**: When `compressPrompt=True`
- **Context Summarization**: When `compressContext=True`
- **Document Chunking**: When `processDocumentsIndividually=True`
## Migration Strategy
### Phase 1: Enhanced AiCallOptions
- Add new fields to `AiCallOptions` model
- Update existing AI calls to use new options
### Phase 2: Unified Interface
- Implement `callAi()` as unified entry point
- Maintain backward compatibility with existing `callAiText()`
### Phase 3: Model Management
- Implement model capability registry
- Add model selection and fallback logic
### Phase 4: Size Management
- Integrate with ExtractionService
- Implement typeGroup-aware reduction strategies
### Phase 5: Full Migration
- Migrate all AI calls to use unified interface
- Remove legacy AI call methods
## Benefits
1. **Reliability**: Multiple model fallbacks ensure high success rate
2. **Efficiency**: Intelligent size management prevents token limit issues
3. **Flexibility**: TypeGroup-aware processing handles diverse content types
4. **Maintainability**: Centralized logic reduces code duplication
5. **Scalability**: Easy to add new models and capabilities
6. **Integration**: Seamless integration with existing ExtractionService
## Future Enhancements
1. **Dynamic Model Loading**: Load models on-demand based on requirements
2. **Performance Monitoring**: Track model performance and optimize selection
3. **Cost Optimization**: Balance quality vs. cost based on use case
4. **Caching**: Cache processed content for repeated operations
5. **Streaming**: Support for streaming responses from models

View file

@ -0,0 +1,228 @@
## PowerON Extraction Service Concept and Architecture
### Goals
- **Normalize** any document into a small set of processingready typeGroups: `text`, `table`, `structure`, `image`, `binary`, `metadata`, `container`.
- **Decouple** extraction (split/normalize) from chunking and merging.
- **Scale** to multipart/container formats (pdf, office) using recursive splitting.
- **Control** cost/latency by honoring `maxSize` and `chunkAllowed` with AI only when needed.
- **Integrate** with AI Prompt Builder entrypoint and support `operationType` behavior.
### New Service Location
- Base: `gateway/modules/services/serviceExtraction/mainServiceExtraction.py`
- Submodules:
- `gateway/modules/services/serviceExtraction/subRegistry.py` (extractor and chunker registries)
- `gateway/modules/services/serviceExtraction/subPipeline.py` (3pass orchestration)
- `gateway/modules/services/serviceExtraction/formats/` (performat extractors)
- `gateway/modules/services/serviceExtraction/chunking/` (pertypeGroup chunkers)
- `gateway/modules/services/serviceExtraction/merging/` (pertypeGroup mergers)
- `gateway/modules/services/serviceExtraction/utils/` (encoding, mime, helpers)
No backwards compatibility is required; this is a clean introduction.
### Core Data Model (standardized outputs)
- ContentPart
- `id: str`
- `parentId: Optional[str]` (preserve hierarchy; root has `None`)
- `label: str` (e.g., "page_2", "sheet_Jan", "table_1")
- `typeGroup: Literal["text","table","structure","image","binary","metadata","container"]`
- `mimeType: str`
- `data: str` (utf8 text for `text|table|structure`; base64 for `image|binary`; empty for `container`)
- `metadata: Dict[str, Any]` (size, pages, width/height, pageIndex, sheetName, sourceRanges, checksum, confidence, warnings)
- ExtractedContent
- `id: str` (document id)
- `parts: List[ContentPart]` (flat list; hierarchy via `parentId`)
- `summary: Optional[Dict[str, Any]]`
Notes:
- `metadata.sourceRanges` or page/sheet indices allow provenance for merges/summaries.
- `metadata.confidence` and `metadata.warnings` guide downstream AI/UX decisions.
### MIME → typeGroup mapping (deterministic first)
- `text/plain`, `text/markdown``text`
- `text/csv``table`
- `application/json`, `application/xml`, `text/html`, `image/svg+xml``structure`
- `image/*``image`
- `application/pdf`, `application/vnd.openxmlformats-officedocument.*``container`
- otherwise → `binary`
Container extractors are responsible for disaggregating into basic typeGroups.
### 3Pass Pipeline
1) Identify and normalize (Split/Extract)
- Start with a root `container` part representing the raw file.
- Resolve extractor by `mimeType`/extension via registry.
- Recursively split container formats into child parts until only basic typeGroups remain (`text|table|structure|image|binary|metadata`).
- Output a single `ExtractedContent` per input document.
2) Chunk
- Route each basic typeGroup to its chunker:
- `text` → sizebounded line/paragraph aware
- `table` → rowbounded (CSV lines), schema aware optional
- `structure` → JSON object/XML subtree/HTML block aware
- `image`, `binary`, `metadata`, `container` → no chunking by default
- Chunkers return `chunks: List[Dict]` with backreferences (`partId`, `order`).
3) Merge
- Strategy driven by call options and workflow:
- `text` → concatenate by logical order (page/section) or keep per part
- `table` → keep separate per table/sheet; optional schema merge
- `structure` → preserve keys/paths; avoid lossy merges
- `image|binary` → usually passthrough
- `metadata|container` → excluded by default
### Registries
- ExtractorRegistry (in `subRegistry.py`)
- Maps `mimeType`/extension to an `Extractor` instance.
- Fallbacks: content sniffing, default binary extractor.
- ChunkerRegistry (in `subRegistry.py`)
- Maps `typeGroup` to a `Chunker`.
### Base Interfaces
Use camelCase and prefix internal methods with `_`.
```python
class Extractor:
def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: ...
def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: ...
class Chunker:
def chunk(self, part: ContentPart, options: Dict[str, Any]) -> List[Dict[str, Any]]: ...
class Merger:
def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: ...
```
### Format Extractors (under `formats/`)
- `text_extractor.py` → emits one `text` part
- `csv_extractor.py` → emits one `table` part (CSV payload)
- `json_extractor.py`, `xml_extractor.py`, `html_extractor.py`, `svg_extractor.py` → emit `structure` parts
- `image_extractor.py` → emits one `image` part; optional OCR is handled by AI postprocessing
- `pdf_extractor.py` → emits `container` root with children:
- per page: `text` part if text found
- per page: extracted images as `image` parts
- per page/section metadata as `metadata`
- `docx_extractor.py``container` + children: headings `structure`, paragraphs `text`, tables `table`, comments `metadata`
- `xlsx_extractor.py``container` + children: each sheet as `table` CSV; properties `metadata`; charts as `image` or `structure`
- `pptx_extractor.py``container` + slides: text boxes `text`, tables `table`, images `image`, notes `metadata`
- `legacy_*_extractor.py``metadata` + `binary` with clear limitations
- `binary_extractor.py` → single `binary` part
### Chunkers (under `chunking/`)
- `text_chunker.py` → size/paragraph aware; configurable sizes
- `table_chunker.py` → split by row count/bytes, keep header propagation
- `structure_chunker.py` → JSON object buckets, XML subtree buckets, HTML block buckets
- `binary_chunker.py` → byte slicing when explicitly requested
- `noop_chunker.py` → for image/metadata/container
### Mergers (under `merging/`)
- `text_merger.py` → page/section aware concatenation
- `table_merger.py` → per sheet/table; optional schema merge
- `structure_merger.py` → key/path preserving grouping
- `default_merger.py` → passthrough
### Orchestration (in `subPipeline.py`)
Highlevel flow for one document:
```python
def runExtraction(document: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ExtractedContent:
# Pass 1: extract/normalize
parts = _extractAll(document, fileName, mimeType, options)
# Pass 2: chunk if allowed
if options.get("chunkAllowed", False):
chunks = _chunkParts(parts, options)
else:
chunks = []
# Pass 3: merge per strategy
merged = _merge(parts, chunks, options.get("mergeStrategy", {}))
return ExtractedContent(id=_makeId(), parts=merged, summary=_buildSummary(parts))
```
### Entry Point and Options (in `mainServiceExtraction.py`)
The service is invoked by AI Prompt Builder with `(documentList, options)`.
Supported options and effects:
- `prompt: str`
- If present, enables optional AI augmentation on extracted content/chunks based on `operationType`.
- `operationType: Literal["general","generate_plan","analyse_content","generate_content","web_research"]`
- `general`/`analyse_content`: prefer deterministic extraction; AI can summarize or answer over chunks.
- `generate_plan`: produce structured `structure` outputs (bullet points, tasks) from `text` chunks.
- `generate_content`: allow AI synthesis over merged `text` parts within `maxSize`.
- `web_research`: treat extracted `structure` and `text` as context; AI orchestrator may fetch more docs upstream.
- `processDocumentsIndividually: bool`
- `True`: run the 3pass pipeline per document; apply `maxSize` per document; return list of results.
- `False`: extract all docs → pool parts → global chunk/merge → apply `maxSize` across the pool; keep provenance by `parentId` and `documentId`.
- `maxSize: int` and `chunkAllowed: bool`
- Hard cap on total size of content passed to AI.
- If `chunkAllowed=True` → prefer chunking to stay under `maxSize`; process chunks iteratively in priority order (e.g., text before images, or by page order).
- If `chunkAllowed=False` → do not chunk; instead summarize down (per part, then hierarchical) until under `maxSize`.
Size governance policy:
1) Compute sizes for candidate parts/chunks.
2) If total ≤ `maxSize` → pass through.
3) If total > `maxSize` and `chunkAllowed` → progressively include highestvalue chunks until the cap; optionally add a final global summary.
4) If total > `maxSize` and not chunkAllowed → summarize per part, then merge summaries; ensure final text ≤ cap.
### AI Integration
- AI is optional and strictly after extraction.
- Recommended placements:
- OCR/VLM for `image` parts when requested.
- LLM summarization for large `text|structure|table` parts to respect `maxSize` when `chunkAllowed=False`.
- LLM question answering (`analyse_content`) over selected chunks.
- All AI calls must respect budget/time guards and the size cap.
### Error Handling
- Every extractor must return either valid parts or a `metadata` part with `warnings/error` plus a `binary` fallback when applicable.
- Include enough context in `metadata` to diagnose issues (library missing, parse error details) without leaking sensitive content.
### Ordering and Provenance
- Preserve logical order within a document (page index, slide index, sheet index).
- Maintain `parentId` links to reconstruct hierarchy during merge and summarization.
### Testing Strategy
- Unit tests per extractor on small fixtures for each format.
- Contract tests for the 3pass pipeline (endtoend) with mixed multipart documents.
- Sizecap tests validating chunking vs summarization paths.
### Migration Notes
- Existing monolithic logic can be moved into `formats/*` and `utils/*` preserving robust decoding and Office/PDF heuristics, while removing AI calls from extractors.
- `ContentItem` usage should shift to `ContentPart` (no backward compatibility required).
### Minimal Pseudocode processDocumentsIndividually
```python
def extractDocuments(documentList: List[Dict], options: Dict[str, Any]):
if options.get("processDocumentsIndividually", True):
results = []
for doc in documentList:
ec = runExtraction(doc.bytes, doc.fileName, doc.mimeType, options)
ec = _applyAiIfRequested(ec, options) # respects maxSize + chunkAllowed
results.append(ec)
return results
else:
# global pool
parts = []
for doc in documentList:
ec = runExtraction(doc.bytes, doc.fileName, doc.mimeType, options)
parts.extend(_tagWithDocumentId(ec.parts, doc.id))
pooled = _poolAndLimit(parts, options) # chunk/summarize to cap
pooled = _applyAiIfRequestedOverPool(pooled, options)
return pooled
```
### Defaults and Configuration
- Chunk sizes per typeGroup are centralized and configurable.
- Merge strategies (text concat policy, table schema inference) are pluggable.
- Registries support runtime extension (new formats) without touching the pipeline.
### Summary
This design introduces a small, stable contract (`ContentPart` with `typeGroup`) and a 3pass pipeline that:
- normalizes diverse documents into uniform parts,
- chunks only what benefits from chunking,
- merges predictably for downstream AI and workflow steps,
while strictly enforcing `maxSize` and honoring `chunkAllowed` and `processDocumentsIndividually`.