From cbea086f91f04dcc185c1426c399c9c5c647b178 Mon Sep 17 00:00:00 2001
From: ValueOn AG
Date: Tue, 30 Sep 2025 00:02:51 +0200
Subject: [PATCH] Ready for test revised generic dynamic ai call center with
dynamic generic content extraction engine
---
modules/datamodels/datamodelAi.py | 22 +-
modules/datamodels/datamodelExtraction.py | 21 +
modules/services/serviceAi/mainServiceAi.py | 360 +++++++++++++--
.../services/serviceExtraction/__init__.py | 5 +
.../serviceExtraction/chunking/__init__.py | 2 +
.../chunking/structure_chunker.py | 59 +++
.../chunking/table_chunker.py | 28 ++
.../chunking/text_chunker.py | 28 ++
.../serviceExtraction/formats/__init__.py | 2 +
.../formats/binary_extractor.py | 25 ++
.../formats/csv_extractor.py | 26 ++
.../formats/docx_extractor.py | 89 ++++
.../formats/html_extractor.py | 30 ++
.../formats/image_extractor.py | 25 ++
.../formats/json_extractor.py | 31 ++
.../formats/pdf_extractor.py | 110 +++++
.../formats/text_extractor.py | 26 ++
.../formats/xlsx_extractor.py | 93 ++++
.../formats/xml_extractor.py | 30 ++
.../mainServiceExtraction.py | 57 +++
.../serviceExtraction/merging/__init__.py | 0
.../merging/default_merger.py | 11 +
.../serviceExtraction/merging/table_merger.py | 152 +++++++
.../serviceExtraction/merging/text_merger.py | 136 ++++++
.../services/serviceExtraction/subPipeline.py | 186 ++++++++
.../services/serviceExtraction/subRegistry.py | 103 +++++
.../serviceExtraction/utils/__init__.py | 7 +
modules/workflows/methods/methodAi.py | 65 ++-
modules/workflows/methods/methodDocument.py | 40 +-
modules/workflows/methods/methodOutlook.py | 24 +-
modules/workflows/methods/methodWeb.py | 20 +-
modules/workflows/processing/handlingTasks.py | 188 ++++++--
.../processing/promptFactoryPlaceholders.py | 418 ++++++++++++++++++
33 files changed, 2262 insertions(+), 157 deletions(-)
create mode 100644 modules/datamodels/datamodelExtraction.py
create mode 100644 modules/services/serviceExtraction/__init__.py
create mode 100644 modules/services/serviceExtraction/chunking/__init__.py
create mode 100644 modules/services/serviceExtraction/chunking/structure_chunker.py
create mode 100644 modules/services/serviceExtraction/chunking/table_chunker.py
create mode 100644 modules/services/serviceExtraction/chunking/text_chunker.py
create mode 100644 modules/services/serviceExtraction/formats/__init__.py
create mode 100644 modules/services/serviceExtraction/formats/binary_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/csv_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/docx_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/html_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/image_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/json_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/pdf_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/text_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/xlsx_extractor.py
create mode 100644 modules/services/serviceExtraction/formats/xml_extractor.py
create mode 100644 modules/services/serviceExtraction/mainServiceExtraction.py
create mode 100644 modules/services/serviceExtraction/merging/__init__.py
create mode 100644 modules/services/serviceExtraction/merging/default_merger.py
create mode 100644 modules/services/serviceExtraction/merging/table_merger.py
create mode 100644 modules/services/serviceExtraction/merging/text_merger.py
create mode 100644 modules/services/serviceExtraction/subPipeline.py
create mode 100644 modules/services/serviceExtraction/subRegistry.py
create mode 100644 modules/services/serviceExtraction/utils/__init__.py
create mode 100644 modules/workflows/processing/promptFactoryPlaceholders.py
diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py
index 2d2e5e95..bab071b6 100644
--- a/modules/datamodels/datamodelAi.py
+++ b/modules/datamodels/datamodelAi.py
@@ -1,4 +1,4 @@
-from typing import Optional, List
+from typing import Optional, List, Dict, Any, Literal
from pydantic import BaseModel, Field
@@ -81,18 +81,36 @@ PROCESSING_MODE_PRIORITY_MAPPING = {
}
+class ModelCapabilities(BaseModel):
+ """Model capabilities and characteristics for dynamic selection."""
+
+ name: str = Field(description="Model name/identifier")
+ maxTokens: int = Field(description="Maximum token limit for this model")
+ capabilities: List[str] = Field(description="List of capabilities: text, image, vision, reasoning, analysis, etc.")
+ costPerToken: float = Field(default=0.0, description="Cost per token (if available)")
+ processingTime: float = Field(default=1.0, description="Average processing time multiplier")
+ isAvailable: bool = Field(default=True, description="Whether model is currently available")
+
+
class AiCallOptions(BaseModel):
"""Options for centralized AI processing with clear operation types and tags."""
operationType: str = Field(default="general", description="Type of operation: general, generate_plan, analyse_content, generate_content, web_research")
priority: str = Field(default="balanced", description="speed|quality|cost|balanced")
compressPrompt: bool = Field(default=True, description="Whether to compress the prompt")
- compressContext: bool = Field(default=True, description="Whether to compress optional context")
+ compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary")
+ processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately; else pool docs")
+ maxContextBytes: Optional[int] = Field(default=None, description="Hard cap for extracted context size passed to the model")
maxCost: Optional[float] = Field(default=None, description="Max cost budget")
maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds")
requiredTags: Optional[List[str]] = Field(default=None, description="Required model tags for selection")
processingMode: str = Field(default="basic", description="Processing mode: basic, advanced, detailed")
resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.")
+
+ # New fields for dynamic strategy
+ callType: Literal["planning", "text"] = Field(default="text", description="Call type: planning or text")
+ safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits (0.0-0.5)")
+ modelCapabilities: Optional[List[str]] = Field(default=None, description="Required model capabilities for filtering")
class AiCallRequest(BaseModel):
diff --git a/modules/datamodels/datamodelExtraction.py b/modules/datamodels/datamodelExtraction.py
new file mode 100644
index 00000000..78ce657e
--- /dev/null
+++ b/modules/datamodels/datamodelExtraction.py
@@ -0,0 +1,21 @@
+from typing import Any, Dict, List, Optional
+from dataclasses import dataclass, field
+
+
+@dataclass
+class ContentPart:
+ id: str
+ parentId: Optional[str]
+ label: str
+ typeGroup: str
+ mimeType: str
+ data: str
+ metadata: Dict[str, Any] = field(default_factory=dict)
+
+
+@dataclass
+class ExtractedContent:
+ id: str
+ parts: List[ContentPart]
+ summary: Optional[Dict[str, Any]] = None
+
diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py
index 0bd00a90..be0b0bad 100644
--- a/modules/services/serviceAi/mainServiceAi.py
+++ b/modules/services/serviceAi/mainServiceAi.py
@@ -2,8 +2,8 @@ import logging
from typing import Dict, Any, List, Optional, Tuple, Union
from modules.datamodels.datamodelChat import ChatDocument
-from modules.services.serviceDocument.mainServiceDocumentExtraction import DocumentExtractionService
-from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions
+from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService
+from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority
from modules.datamodels.datamodelWeb import (
WebSearchRequest,
WebCrawlRequest,
@@ -34,7 +34,7 @@ class AiService:
self.serviceCenter = serviceCenter
# Only depend on interfaces
self.aiObjects = None # Will be initialized in create()
- self.documentExtractor = DocumentExtractionService()
+ self.extractionService = ExtractionService()
@classmethod
async def create(cls, serviceCenter=None) -> "AiService":
@@ -59,10 +59,14 @@ class AiService:
documents,
options.operationType if options else "general",
options.compressContext if options else True,
- processDocumentsIndividually,
+ options.processDocumentsIndividually if options else processDocumentsIndividually,
)
effectiveOptions = options or AiCallOptions()
+ # Compute maxContextBytes if not provided: conservative defaults per model tag could be added here
+ if options and options.maxContextBytes is None:
+ options.maxContextBytes = 16000 # bytes, conservative default if model limit unknown
+
request = AiCallRequest(
prompt=prompt,
context=documentContent or None,
@@ -167,42 +171,60 @@ class AiService:
if not documents:
return ""
+ # Build extraction options
+ extractionOptions: Dict[str, Any] = {
+ "prompt": f"Extract relevant content for {operationType}",
+ "operationType": operationType,
+ "processDocumentsIndividually": processIndividually,
+ # Respect size/ chunking hints if provided via AiCallOptions
+ "maxSize": getattr(getattr(self, "_aiOptions", None), "maxContextBytes", None) or 0,
+ "chunkAllowed": getattr(getattr(self, "_aiOptions", None), "chunkAllowed", True),
+ # basic merge strategy for text by parent
+ "mergeStrategy": {"groupBy": "parentId", "orderBy": "pageIndex"},
+ }
+
+ # Prepare documentList for extractor
+ documentList: List[Dict[str, Any]] = []
+ for d in documents:
+ documentList.append({
+ "id": d.id,
+ "bytes": d.fileData,
+ "fileName": d.fileName,
+ "mimeType": d.mimeType,
+ })
+
processedContents: List[str] = []
- for doc in documents:
- try:
- extracted = await self.documentExtractor.processFileData(
- doc.fileData,
- doc.fileName,
- doc.mimeType,
- prompt=f"Extract relevant content for {operationType}",
- documentId=doc.id,
- enableAI=True,
- )
- docContent: List[str] = []
- for contentItem in extracted.contents:
- if contentItem.data and contentItem.data.strip():
- docContent.append(contentItem.data)
+ try:
+ extractionResult = self.extractionService.extractDocuments(documentList, extractionOptions)
- if docContent:
- combinedDocContent = "\n\n".join(docContent)
- if (
- compressDocuments
- and len(combinedDocContent.encode("utf-8")) > 10000
- ):
- combinedDocContent = await self._compressContent(
- combinedDocContent, 10000, "document"
- )
- processedContents.append(
- f"Document: {doc.fileName}\n{combinedDocContent}"
- )
- except Exception as e:
- logger.warning(
- f"Error processing document {doc.fileName}: {str(e)}"
- )
- processedContents.append(
- f"Document: {doc.fileName}\n[Error processing document: {str(e)}]"
- )
+ def _partsToText(parts) -> str:
+ lines: List[str] = []
+ for p in parts:
+ if p.typeGroup in ("text", "table", "structure") and p.data and isinstance(p.data, str):
+ lines.append(p.data)
+ return "\n\n".join(lines)
+
+ if processIndividually and isinstance(extractionResult, list):
+ for i, ec in enumerate(extractionResult):
+ try:
+ contentText = _partsToText(ec.parts)
+ if compressDocuments and len(contentText.encode("utf-8")) > 10000:
+ contentText = await self._compressContent(contentText, 10000, "document")
+ processedContents.append(contentText)
+ except Exception as e:
+ logger.warning(f"Error aggregating extracted content: {str(e)}")
+ processedContents.append("[Error aggregating content]")
+ else:
+ # pooled mode returns dict
+ parts = extractionResult.get("parts", []) if isinstance(extractionResult, dict) else []
+ contentText = _partsToText(parts)
+ if compressDocuments and len(contentText.encode("utf-8")) > 10000:
+ contentText = await self._compressContent(contentText, 10000, "document")
+ processedContents.append(contentText)
+ except Exception as e:
+ logger.warning(f"Error during extraction: {str(e)}")
+ processedContents.append("[Error during extraction]")
return "\n\n---\n\n".join(processedContents)
@@ -227,3 +249,267 @@ class AiService:
logger.warning(f"AI compression failed, using truncation: {str(e)}")
return content[:targetSize] + "... [truncated]"
+ # ===== DYNAMIC GENERIC AI CALLS IMPLEMENTATION =====
+
+ async def callAi(
+ self,
+ prompt: str,
+ documents: Optional[List[ChatDocument]] = None,
+ placeholders: Optional[Dict[str, str]] = None,
+ options: Optional[AiCallOptions] = None
+ ) -> str:
+ """
+ Unified AI call interface that automatically routes to appropriate handler.
+
+ Args:
+ prompt: The main prompt for the AI call
+ documents: Optional list of documents to process
+ placeholders: Optional dictionary of placeholder replacements for planning calls
+ options: AI call configuration options
+
+ Returns:
+ AI response as string
+
+ Raises:
+ Exception: If all available models fail
+ """
+ if options is None:
+ options = AiCallOptions()
+
+ # Auto-determine call type based on documents and operation type
+ call_type = self._determineCallType(documents, options.operationType)
+ options.callType = call_type
+
+ if call_type == "planning":
+ return await self._callAiPlanning(prompt, placeholders, options)
+ else:
+ return await self._callAiText(prompt, documents, options)
+
+ def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str:
+ """
+ Determine call type based on documents and operation type.
+
+ Criteria: no documents AND (operationType is "generate_plan" or "analyse_content") -> planning
+ """
+ has_documents = documents is not None and len(documents) > 0
+ is_planning_operation = operation_type in [OperationType.GENERATE_PLAN, OperationType.ANALYSE_CONTENT]
+
+ if not has_documents and is_planning_operation:
+ return "planning"
+ else:
+ return "text"
+
+ async def _callAiPlanning(
+ self,
+ prompt: str,
+ placeholders: Optional[Dict[str, str]],
+ options: AiCallOptions
+ ) -> str:
+ """
+ Handle planning calls with placeholder system and selective summarization.
+ """
+ # Get available models for planning (text + reasoning capabilities)
+ models = self._getModelsForOperation("planning", options)
+
+ for model in models:
+ try:
+ # Build full prompt with placeholders
+ full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders)
+
+ # Check size and reduce if needed
+ if self._exceedsTokenLimit(full_prompt, model, options.safetyMargin):
+ full_prompt = self._reducePlanningPrompt(full_prompt, placeholders, model, options)
+
+ # Make AI call using existing callAiText
+ result = await self.callAiText(
+ prompt=full_prompt,
+ documents=None,
+ options=options
+ )
+ return result
+
+ except Exception as e:
+ logger.warning(f"Planning model {model.name} failed: {e}")
+ continue
+
+ raise Exception("All planning models failed - check model availability and capabilities")
+
+ async def _callAiText(
+ self,
+ prompt: str,
+ documents: Optional[List[ChatDocument]],
+ options: AiCallOptions
+ ) -> str:
+ """
+ Handle text calls with document processing through ExtractionService.
+ """
+ # Get available models for text processing
+ models = self._getModelsForOperation("text", options)
+
+ for model in models:
+ try:
+ # Extract and process documents using ExtractionService
+ context = ""
+ if documents:
+ # Convert ChatDocument to documentList format for ExtractionService
+ documentList = [{
+ "id": d.id,
+ "bytes": d.fileData,
+ "fileName": d.fileName,
+ "mimeType": d.mimeType
+ } for d in documents]
+
+ extracted_content = await self.extractionService.extractDocuments(
+ documentList=documentList,
+ options={
+ "prompt": prompt,
+ "operationType": options.operationType,
+ "processDocumentsIndividually": options.processDocumentsIndividually,
+ "maxSize": options.maxContextBytes or int(model.maxTokens * 0.9),
+ "chunkAllowed": not options.compressContext,
+ "mergeStrategy": {"groupBy": "typeGroup"}
+ }
+ )
+
+ # Get text content from extracted parts using typeGroup-aware processing
+ context = self._extractTextFromContentParts(extracted_content)
+
+ # Check size and reduce if needed
+ full_prompt = prompt + "\n\n" + context if context else prompt
+ if self._exceedsTokenLimit(full_prompt, model, options.safetyMargin):
+ full_prompt = self._reduceTextPrompt(prompt, context, model, options)
+
+ # Make AI call using existing callAiText
+ result = await self.callAiText(
+ prompt=full_prompt,
+ documents=None,
+ options=options
+ )
+ return result
+
+ except Exception as e:
+ logger.warning(f"Text model {model.name} failed: {e}")
+ continue
+
+ raise Exception("All text models failed - check model availability and capabilities")
+
+ def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]:
+ """
+ Get models capable of handling the specific operation with capability filtering.
+ """
+ # For now, return a default model - this will be enhanced with actual model registry
+ default_model = ModelCapabilities(
+ name="default",
+ maxTokens=4000,
+ capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"],
+ costPerToken=0.001,
+ processingTime=1.0,
+ isAvailable=True
+ )
+ return [default_model]
+
+ def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str:
+ """
+ Build full prompt by replacing placeholders with their content.
+ Uses the new {{KEY:placeholder}} format.
+ """
+ if not placeholders:
+ return prompt
+
+ full_prompt = prompt
+ for placeholder, content in placeholders.items():
+ # Replace both old format {{placeholder}} and new format {{KEY:placeholder}}
+ full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content)
+ full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content)
+
+ return full_prompt
+
+ def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool:
+ """
+ Check if text exceeds model token limit with safety margin.
+ """
+ # Simple character-based estimation (4 chars per token)
+ estimated_tokens = len(text) // 4
+ max_tokens = int(model.maxTokens * (1 - safety_margin))
+ return estimated_tokens > max_tokens
+
+ def _reducePlanningPrompt(
+ self,
+ full_prompt: str,
+ placeholders: Optional[Dict[str, str]],
+ model: ModelCapabilities,
+ options: AiCallOptions
+ ) -> str:
+ """
+ Reduce planning prompt size by summarizing placeholders while preserving prompt structure.
+ """
+ if not placeholders:
+ return self._reduceText(full_prompt, 0.7)
+
+ # Reduce placeholders while preserving prompt
+ reduced_placeholders = {}
+ for placeholder, content in placeholders.items():
+ if len(content) > 1000: # Only reduce long content
+ reduction_factor = 0.7
+ reduced_content = self._reduceText(content, reduction_factor)
+ reduced_placeholders[placeholder] = reduced_content
+ else:
+ reduced_placeholders[placeholder] = content
+
+ return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders)
+
+ def _reduceTextPrompt(
+ self,
+ prompt: str,
+ context: str,
+ model: ModelCapabilities,
+ options: AiCallOptions
+ ) -> str:
+ """
+ Reduce text prompt size using typeGroup-aware chunking and merging.
+ """
+ max_size = int(model.maxTokens * (1 - options.safetyMargin))
+
+ if options.compressPrompt:
+ # Reduce both prompt and context
+ target_size = max_size
+ current_size = len(prompt) + len(context)
+ reduction_factor = (target_size * 0.7) / current_size
+
+ if reduction_factor < 1.0:
+ prompt = self._reduceText(prompt, reduction_factor)
+ context = self._reduceText(context, reduction_factor)
+ else:
+ # Only reduce context, preserve prompt integrity
+ max_context_size = max_size - len(prompt)
+ if len(context) > max_context_size:
+ reduction_factor = max_context_size / len(context)
+ context = self._reduceText(context, reduction_factor)
+
+ return prompt + "\n\n" + context if context else prompt
+
+ def _extractTextFromContentParts(self, extracted_content) -> str:
+ """
+ Extract text content from ExtractionService ContentPart objects.
+ """
+ if not extracted_content or not hasattr(extracted_content, 'parts'):
+ return ""
+
+ text_parts = []
+ for part in extracted_content.parts:
+ if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']:
+ if hasattr(part, 'data') and part.data:
+ text_parts.append(part.data)
+
+ return "\n\n".join(text_parts)
+
+ def _reduceText(self, text: str, reduction_factor: float) -> str:
+ """
+ Reduce text size by the specified factor.
+ """
+ if reduction_factor >= 1.0:
+ return text
+
+ target_length = int(len(text) * reduction_factor)
+ return text[:target_length] + "... [reduced]"
+
diff --git a/modules/services/serviceExtraction/__init__.py b/modules/services/serviceExtraction/__init__.py
new file mode 100644
index 00000000..9fe4727e
--- /dev/null
+++ b/modules/services/serviceExtraction/__init__.py
@@ -0,0 +1,5 @@
+from .mainServiceExtraction import ExtractionService
+
+__all__ = ["ExtractionService"]
+
+
diff --git a/modules/services/serviceExtraction/chunking/__init__.py b/modules/services/serviceExtraction/chunking/__init__.py
new file mode 100644
index 00000000..139597f9
--- /dev/null
+++ b/modules/services/serviceExtraction/chunking/__init__.py
@@ -0,0 +1,2 @@
+
+
diff --git a/modules/services/serviceExtraction/chunking/structure_chunker.py b/modules/services/serviceExtraction/chunking/structure_chunker.py
new file mode 100644
index 00000000..921b3d4d
--- /dev/null
+++ b/modules/services/serviceExtraction/chunking/structure_chunker.py
@@ -0,0 +1,59 @@
+from typing import Any, Dict, List
+import json
+
+from ..types import ContentPart
+from ..subRegistry import Chunker
+
+
+class StructureChunker(Chunker):
+ def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]:
+ maxBytes = int(options.get("structureChunkSize", 40000))
+ data = part.data or ""
+ # best-effort: try JSON list/object bucketing; else fallback to line-based
+ chunks: List[Dict[str, Any]] = []
+ try:
+ obj = json.loads(data)
+ def emit(bucket: Any):
+ text = json.dumps(bucket, ensure_ascii=False)
+ chunks.append({"data": text, "size": len(text.encode('utf-8')), "order": len(chunks)})
+ if isinstance(obj, list):
+ bucket: list[Any] = []
+ size = 0
+ for item in obj:
+ text = json.dumps(item, ensure_ascii=False)
+ s = len(text.encode('utf-8'))
+ if size + s > maxBytes and bucket:
+ emit(bucket)
+ bucket = [item]
+ size = s
+ else:
+ bucket.append(item)
+ size += s
+ if bucket:
+ emit(bucket)
+ else:
+ text = json.dumps(obj, ensure_ascii=False)
+ if len(text.encode('utf-8')) <= maxBytes:
+ emit(obj)
+ else:
+ # fallback to line chunking
+ raise ValueError("too large")
+ except Exception:
+ current: List[str] = []
+ size = 0
+ for line in data.split('\n'):
+ s = len(line.encode('utf-8')) + 1
+ if size + s > maxBytes and current:
+ text = '\n'.join(current)
+ chunks.append({"data": text, "size": len(text.encode('utf-8')), "order": len(chunks)})
+ current = [line]
+ size = s
+ else:
+ current.append(line)
+ size += s
+ if current:
+ text = '\n'.join(current)
+ chunks.append({"data": text, "size": len(text.encode('utf-8')), "order": len(chunks)})
+ return chunks
+
+
diff --git a/modules/services/serviceExtraction/chunking/table_chunker.py b/modules/services/serviceExtraction/chunking/table_chunker.py
new file mode 100644
index 00000000..9a614896
--- /dev/null
+++ b/modules/services/serviceExtraction/chunking/table_chunker.py
@@ -0,0 +1,28 @@
+from typing import Any, Dict, List
+
+from ..types import ContentPart
+from ..subRegistry import Chunker
+
+
+class TableChunker(Chunker):
+ def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]:
+ maxBytes = int(options.get("tableChunkSize", 40000))
+ chunks: List[Dict[str, Any]] = []
+ current: List[str] = []
+ size = 0
+ for line in part.data.split('\n'):
+ lineSize = len(line.encode('utf-8')) + 1
+ if size + lineSize > maxBytes and current:
+ data = '\n'.join(current)
+ chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)})
+ current = [line]
+ size = lineSize
+ else:
+ current.append(line)
+ size += lineSize
+ if current:
+ data = '\n'.join(current)
+ chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)})
+ return chunks
+
+
diff --git a/modules/services/serviceExtraction/chunking/text_chunker.py b/modules/services/serviceExtraction/chunking/text_chunker.py
new file mode 100644
index 00000000..69218837
--- /dev/null
+++ b/modules/services/serviceExtraction/chunking/text_chunker.py
@@ -0,0 +1,28 @@
+from typing import Any, Dict, List
+
+from ..types import ContentPart
+from ..subRegistry import Chunker
+
+
+class TextChunker(Chunker):
+ def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]:
+ maxBytes = int(options.get("textChunkSize", 40000))
+ chunks: List[Dict[str, Any]] = []
+ current: List[str] = []
+ size = 0
+ for line in part.data.split('\n'):
+ lineSize = len(line.encode('utf-8')) + 1
+ if size + lineSize > maxBytes and current:
+ data = '\n'.join(current)
+ chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)})
+ current = [line]
+ size = lineSize
+ else:
+ current.append(line)
+ size += lineSize
+ if current:
+ data = '\n'.join(current)
+ chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)})
+ return chunks
+
+
diff --git a/modules/services/serviceExtraction/formats/__init__.py b/modules/services/serviceExtraction/formats/__init__.py
new file mode 100644
index 00000000..139597f9
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/__init__.py
@@ -0,0 +1,2 @@
+
+
diff --git a/modules/services/serviceExtraction/formats/binary_extractor.py b/modules/services/serviceExtraction/formats/binary_extractor.py
new file mode 100644
index 00000000..1c201c36
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/binary_extractor.py
@@ -0,0 +1,25 @@
+from typing import Any, Dict, List
+import base64
+
+from ..utils import makeId
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..subRegistry import Extractor
+
+
+class BinaryExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return True
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ mimeType = context.get("mimeType") or "application/octet-stream"
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="binary",
+ typeGroup="binary",
+ mimeType=mimeType,
+ data=base64.b64encode(fileBytes).decode("utf-8"),
+ metadata={"size": len(fileBytes), "warning": "Unsupported file type"}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/formats/csv_extractor.py b/modules/services/serviceExtraction/formats/csv_extractor.py
new file mode 100644
index 00000000..db3cf969
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/csv_extractor.py
@@ -0,0 +1,26 @@
+from typing import Any, Dict, List
+
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+from ..subRegistry import Extractor
+
+
+class CsvExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType == "text/csv" or (fileName or "").lower().endswith(".csv")
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ fileName = context.get("fileName")
+ mimeType = context.get("mimeType") or "text/csv"
+ data = fileBytes.decode("utf-8", errors="replace")
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="main",
+ typeGroup="table",
+ mimeType=mimeType,
+ data=data,
+ metadata={"size": len(fileBytes)}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/formats/docx_extractor.py b/modules/services/serviceExtraction/formats/docx_extractor.py
new file mode 100644
index 00000000..6cb75716
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/docx_extractor.py
@@ -0,0 +1,89 @@
+from typing import Any, Dict, List
+import io
+
+from ..utils import makeId
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..subRegistry import Extractor
+
+
+class DocxExtractor(Extractor):
+ def __init__(self):
+ self._loaded = False
+ self._haveLibs = False
+
+ def _load(self):
+ if self._loaded:
+ return
+ self._loaded = True
+ try:
+ global docx
+ import docx # python-docx
+ self._haveLibs = True
+ except Exception:
+ self._haveLibs = False
+
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or (fileName or "").lower().endswith(".docx")
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ self._load()
+ parts: List[ContentPart] = []
+ rootId = makeId()
+ parts.append(ContentPart(
+ id=rootId,
+ parentId=None,
+ label="docx",
+ typeGroup="container",
+ mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ data="",
+ metadata={"size": len(fileBytes)}
+ ))
+
+ if not self._haveLibs:
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label="binary",
+ typeGroup="binary",
+ mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
+ data="",
+ metadata={"size": len(fileBytes), "warning": "DOCX lib not available"}
+ ))
+ return parts
+
+ with io.BytesIO(fileBytes) as buf:
+ d = docx.Document(buf)
+ # paragraphs
+ for i, para in enumerate(d.paragraphs):
+ text = para.text or ""
+ if text.strip():
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label=f"p_{i+1}",
+ typeGroup="text",
+ mimeType="text/plain",
+ data=text,
+ metadata={"size": len(text.encode('utf-8'))}
+ ))
+ # tables → CSV rows
+ for ti, table in enumerate(d.tables):
+ rows: list[str] = []
+ for row in table.rows:
+ cells = [ (cell.text or "").replace('"', '""') for cell in row.cells ]
+ rows.append(",".join([f'"{c}"' for c in cells]))
+ csvData = "\n".join(rows)
+ if csvData:
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label=f"table_{ti+1}",
+ typeGroup="table",
+ mimeType="text/csv",
+ data=csvData,
+ metadata={"size": len(csvData.encode('utf-8'))}
+ ))
+
+ return parts
+
+
diff --git a/modules/services/serviceExtraction/formats/html_extractor.py b/modules/services/serviceExtraction/formats/html_extractor.py
new file mode 100644
index 00000000..6c49c50c
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/html_extractor.py
@@ -0,0 +1,30 @@
+from typing import Any, Dict, List
+from bs4 import BeautifulSoup
+
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+from ..subRegistry import Extractor
+
+
+class HtmlExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType == "text/html" or (fileName or "").lower().endswith((".html", ".htm"))
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ mimeType = context.get("mimeType") or "text/html"
+ text = fileBytes.decode("utf-8", errors="replace")
+ try:
+ BeautifulSoup(text, "html.parser")
+ except Exception:
+ pass
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="main",
+ typeGroup="structure",
+ mimeType=mimeType,
+ data=text,
+ metadata={"size": len(fileBytes)}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/formats/image_extractor.py b/modules/services/serviceExtraction/formats/image_extractor.py
new file mode 100644
index 00000000..296eb50b
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/image_extractor.py
@@ -0,0 +1,25 @@
+from typing import Any, Dict, List
+import base64
+
+from ..utils import makeId
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..subRegistry import Extractor
+
+
+class ImageExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return (mimeType or "").startswith("image/")
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ mimeType = context.get("mimeType") or "image/unknown"
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="image",
+ typeGroup="image",
+ mimeType=mimeType,
+ data=base64.b64encode(fileBytes).decode("utf-8"),
+ metadata={"size": len(fileBytes)}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/formats/json_extractor.py b/modules/services/serviceExtraction/formats/json_extractor.py
new file mode 100644
index 00000000..456eb08e
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/json_extractor.py
@@ -0,0 +1,31 @@
+from typing import Any, Dict, List
+import json
+
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+from ..subRegistry import Extractor
+
+
+class JsonExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType == "application/json" or (fileName or "").lower().endswith(".json")
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ mimeType = context.get("mimeType") or "application/json"
+ text = fileBytes.decode("utf-8", errors="replace")
+ # verify JSON is well-formed; fall back to text if not
+ try:
+ json.loads(text)
+ except Exception:
+ pass
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="main",
+ typeGroup="structure",
+ mimeType=mimeType,
+ data=text,
+ metadata={"size": len(fileBytes)}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/formats/pdf_extractor.py b/modules/services/serviceExtraction/formats/pdf_extractor.py
new file mode 100644
index 00000000..4d0d8058
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/pdf_extractor.py
@@ -0,0 +1,110 @@
+from typing import Any, Dict, List
+import base64
+import io
+
+from ..utils import makeId
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..subRegistry import Extractor
+
+
+class PdfExtractor(Extractor):
+ def __init__(self):
+ self._loaded = False
+ self._haveLibs = False
+
+ def _load(self):
+ if self._loaded:
+ return
+ self._loaded = True
+ try:
+ global PyPDF2, fitz
+ import PyPDF2
+ import fitz # PyMuPDF
+ self._haveLibs = True
+ except Exception:
+ self._haveLibs = False
+
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType == "application/pdf" or (fileName or "").lower().endswith(".pdf")
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ self._load()
+ parts: List[ContentPart] = []
+ rootId = makeId()
+ parts.append(ContentPart(
+ id=rootId,
+ parentId=None,
+ label="pdf",
+ typeGroup="container",
+ mimeType="application/pdf",
+ data="",
+ metadata={"size": len(fileBytes)}
+ ))
+
+ if not self._haveLibs:
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label="binary",
+ typeGroup="binary",
+ mimeType="application/pdf",
+ data=base64.b64encode(fileBytes).decode("utf-8"),
+ metadata={"size": len(fileBytes), "warning": "PDF libs not available"}
+ ))
+ return parts
+
+ # Extract text per page with PyPDF2
+ try:
+ with io.BytesIO(fileBytes) as buf:
+ reader = PyPDF2.PdfReader(buf)
+ for i, page in enumerate(reader.pages):
+ try:
+ text = page.extract_text() or ""
+ if text.strip():
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label=f"page_{i+1}",
+ typeGroup="text",
+ mimeType="text/plain",
+ data=text,
+ metadata={"pages": 1, "pageIndex": i, "size": len(text.encode('utf-8'))}
+ ))
+ except Exception:
+ continue
+ except Exception:
+ pass
+
+ # Extract images with PyMuPDF
+ try:
+ with io.BytesIO(fileBytes) as buf2:
+ doc = fitz.open(stream=buf2, filetype="pdf")
+ for i in range(len(doc)):
+ page = doc[i]
+ images = page.get_images(full=True)
+ for j, img in enumerate(images):
+ try:
+ xref = img[0]
+ baseImage = doc.extract_image(xref)
+ if baseImage:
+ imgBytes = baseImage.get("image", b"")
+ ext = baseImage.get("ext", "png")
+ if imgBytes:
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label=f"image_{i+1}_{j}",
+ typeGroup="image",
+ mimeType=f"image/{ext}",
+ data=base64.b64encode(imgBytes).decode("utf-8"),
+ metadata={"pageIndex": i, "size": len(imgBytes)}
+ ))
+ except Exception:
+ continue
+ doc.close()
+ except Exception:
+ pass
+
+ return parts
+
+
diff --git a/modules/services/serviceExtraction/formats/text_extractor.py b/modules/services/serviceExtraction/formats/text_extractor.py
new file mode 100644
index 00000000..5099d04c
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/text_extractor.py
@@ -0,0 +1,26 @@
+from typing import Any, Dict, List
+
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+from ..subRegistry import Extractor
+
+
+class TextExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType in ("text/plain", "text/markdown")
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ fileName = context.get("fileName")
+ mimeType = context.get("mimeType") or "text/plain"
+ data = fileBytes.decode("utf-8", errors="replace")
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="main",
+ typeGroup="text",
+ mimeType=mimeType,
+ data=data,
+ metadata={"size": len(fileBytes)}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/formats/xlsx_extractor.py b/modules/services/serviceExtraction/formats/xlsx_extractor.py
new file mode 100644
index 00000000..577b0776
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/xlsx_extractor.py
@@ -0,0 +1,93 @@
+from typing import Any, Dict, List
+import io
+from datetime import datetime
+
+from ..utils import makeId
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..subRegistry import Extractor
+
+
+class XlsxExtractor(Extractor):
+ def __init__(self):
+ self._loaded = False
+ self._haveLibs = False
+
+ def _load(self):
+ if self._loaded:
+ return
+ self._loaded = True
+ try:
+ global openpyxl
+ import openpyxl
+ self._haveLibs = True
+ except Exception:
+ self._haveLibs = False
+
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ mt = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
+ return mimeType == mt or (fileName or "").lower().endswith((".xlsx", ".xlsm"))
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ self._load()
+ parts: List[ContentPart] = []
+ rootId = makeId()
+ parts.append(ContentPart(
+ id=rootId,
+ parentId=None,
+ label="xlsx",
+ typeGroup="container",
+ mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ data="",
+ metadata={"size": len(fileBytes)}
+ ))
+
+ if not self._haveLibs:
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label="binary",
+ typeGroup="binary",
+ mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ data="",
+ metadata={"size": len(fileBytes), "warning": "openpyxl not available"}
+ ))
+ return parts
+
+ with io.BytesIO(fileBytes) as buf:
+ wb = openpyxl.load_workbook(buf, data_only=True)
+ for sheetName in wb.sheetnames:
+ ws = wb[sheetName]
+ # extract rectangular data region by min/max
+ min_row = ws.min_row
+ max_row = ws.max_row
+ min_col = ws.min_column
+ max_col = ws.max_column
+ lines: list[str] = []
+ for r in range(min_row, max_row + 1):
+ cells: list[str] = []
+ for c in range(min_col, max_col + 1):
+ cell = ws.cell(row=r, column=c)
+ v = cell.value
+ if v is None:
+ cells.append("")
+ elif isinstance(v, (int, float)):
+ cells.append(str(v))
+ elif isinstance(v, datetime):
+ cells.append(v.strftime("%Y-%m-%d %H:%M:%S"))
+ else:
+ cells.append(f'"{str(v).replace("\"", "\"\"")}"')
+ lines.append(",".join(cells))
+ csvData = "\n".join(lines)
+ parts.append(ContentPart(
+ id=makeId(),
+ parentId=rootId,
+ label=f"sheet_{sheetName}",
+ typeGroup="table",
+ mimeType="text/csv",
+ data=csvData,
+ metadata={"sheet": sheetName, "size": len(csvData.encode('utf-8'))}
+ ))
+
+ return parts
+
+
diff --git a/modules/services/serviceExtraction/formats/xml_extractor.py b/modules/services/serviceExtraction/formats/xml_extractor.py
new file mode 100644
index 00000000..7067924b
--- /dev/null
+++ b/modules/services/serviceExtraction/formats/xml_extractor.py
@@ -0,0 +1,30 @@
+from typing import Any, Dict, List
+import xml.etree.ElementTree as ET
+
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+from ..subRegistry import Extractor
+
+
+class XmlExtractor(Extractor):
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return mimeType == "application/xml" or (fileName or "").lower().endswith((".xml", ".rss", ".atom"))
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]:
+ mimeType = context.get("mimeType") or "application/xml"
+ text = fileBytes.decode("utf-8", errors="replace")
+ try:
+ ET.fromstring(text)
+ except Exception:
+ pass
+ return [ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="main",
+ typeGroup="structure",
+ mimeType=mimeType,
+ data=text,
+ metadata={"size": len(fileBytes)}
+ )]
+
+
diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py
new file mode 100644
index 00000000..1f6f36e5
--- /dev/null
+++ b/modules/services/serviceExtraction/mainServiceExtraction.py
@@ -0,0 +1,57 @@
+from typing import Any, Dict, List, Optional
+import uuid
+
+from .subRegistry import ExtractorRegistry, ChunkerRegistry
+from .subPipeline import runExtraction, poolAndLimit, applyAiIfRequested
+from modules.datamodels.datamodelExtraction import ExtractedContent, ContentPart
+
+
+class ExtractionService:
+ def __init__(self):
+ self._extractorRegistry = ExtractorRegistry()
+ self._chunkerRegistry = ChunkerRegistry()
+
+ def extractDocuments(self, documentList: List[Dict[str, Any]], options: Dict[str, Any]) -> Any:
+ processIndividually = options.get("processDocumentsIndividually", True)
+
+ if processIndividually:
+ results: List[ExtractedContent] = []
+ for doc in documentList:
+ ec = runExtraction(
+ extractorRegistry=self._extractorRegistry,
+ chunkerRegistry=self._chunkerRegistry,
+ documentBytes=doc.get("bytes"),
+ fileName=doc.get("fileName"),
+ mimeType=doc.get("mimeType"),
+ options=options
+ )
+ ec = applyAiIfRequested(ec, options)
+ results.append(ec)
+ return results
+ else:
+ allParts: List[ContentPart] = []
+ for doc in documentList:
+ ec = runExtraction(
+ extractorRegistry=self._extractorRegistry,
+ chunkerRegistry=self._chunkerRegistry,
+ documentBytes=doc.get("bytes"),
+ fileName=doc.get("fileName"),
+ mimeType=doc.get("mimeType"),
+ options=options
+ )
+ for p in ec.parts:
+ if "documentId" not in p.metadata:
+ p.metadata["documentId"] = doc.get("id") or str(uuid.uuid4())
+ allParts.extend(ec.parts)
+
+ pooled = poolAndLimit(allParts, self._chunkerRegistry, options)
+ # In pooled mode we return a dict containing pooled parts and an optional AI output
+ pooledResult: Dict[str, Any] = {
+ "parts": pooled,
+ "summary": {"documents": len(documentList)}
+ }
+ aiOut = applyAiIfRequested(ExtractedContent(id=str(uuid.uuid4()), parts=pooled, summary=None), options)
+ pooledResult["ai"] = aiOut.summary if isinstance(aiOut, ExtractedContent) else aiOut
+ return pooledResult
+
+
diff --git a/modules/services/serviceExtraction/merging/__init__.py b/modules/services/serviceExtraction/merging/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/modules/services/serviceExtraction/merging/default_merger.py b/modules/services/serviceExtraction/merging/default_merger.py
new file mode 100644
index 00000000..ceab6635
--- /dev/null
+++ b/modules/services/serviceExtraction/merging/default_merger.py
@@ -0,0 +1,11 @@
+from typing import Any, Dict, List
+from modules.datamodels.datamodelExtraction import ContentPart
+
+
+class DefaultMerger:
+ def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
+ """
+ Default merger that passes through parts unchanged.
+ Used for image, binary, metadata, container typeGroups.
+ """
+ return parts
diff --git a/modules/services/serviceExtraction/merging/table_merger.py b/modules/services/serviceExtraction/merging/table_merger.py
new file mode 100644
index 00000000..04be404e
--- /dev/null
+++ b/modules/services/serviceExtraction/merging/table_merger.py
@@ -0,0 +1,152 @@
+from typing import Any, Dict, List
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+
+
+class TableMerger:
+ def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
+ """
+ Merge table parts based on strategy.
+ Strategy options:
+ - groupBy: "parentId" (default), "documentId", "sheet", "none"
+ - maxSize: maximum size per merged part
+ - combineSheets: bool - whether to combine multiple sheets into one table
+ """
+ if not parts:
+ return parts
+
+ groupBy = strategy.get("groupBy", "parentId")
+ maxSize = strategy.get("maxSize", 0)
+ combineSheets = strategy.get("combineSheets", False)
+
+ # Group parts
+ groups = self._groupParts(parts, groupBy, combineSheets)
+
+ merged: List[ContentPart] = []
+ for groupKey, groupParts in groups.items():
+ if maxSize > 0:
+ merged.extend(self._mergeWithSizeLimit(groupParts, maxSize, groupKey))
+ else:
+ merged.extend(self._mergeGroup(groupParts, groupKey))
+
+ return merged
+
+ def _groupParts(self, parts: List[ContentPart], groupBy: str, combineSheets: bool) -> Dict[str, List[ContentPart]]:
+ groups: Dict[str, List[ContentPart]] = {}
+
+ for part in parts:
+ if part.typeGroup != "table":
+ # Non-table parts go in their own group
+ key = f"nontable_{part.id}"
+ if key not in groups:
+ groups[key] = []
+ groups[key].append(part)
+ continue
+
+ if groupBy == "parentId":
+ key = part.parentId or "root"
+ elif groupBy == "documentId":
+ key = part.metadata.get("documentId", "unknown")
+ elif groupBy == "sheet" and not combineSheets:
+ key = part.metadata.get("sheet", "unknown")
+ else: # "none" or combineSheets=True
+ key = "all_tables"
+
+ if key not in groups:
+ groups[key] = []
+ groups[key].append(part)
+
+ return groups
+
+ def _mergeGroup(self, parts: List[ContentPart], groupKey: str) -> List[ContentPart]:
+ if not parts:
+ return []
+ if len(parts) == 1:
+ return parts
+
+ # For tables, we typically keep them separate unless explicitly combining
+ # But we can add metadata about the group
+ for i, part in enumerate(parts):
+ part.metadata["groupKey"] = groupKey
+ part.metadata["groupIndex"] = i
+ part.metadata["groupSize"] = len(parts)
+
+ return parts
+
+ def _mergeWithSizeLimit(self, parts: List[ContentPart], maxSize: int, groupKey: str) -> List[ContentPart]:
+ if not parts:
+ return []
+
+ # For tables, we typically don't merge across different tables
+ # Instead, we chunk individual large tables
+ merged: List[ContentPart] = []
+
+ for part in parts:
+ partSize = part.metadata.get("size", 0)
+
+ if partSize <= maxSize:
+ # Part fits within limit
+ part.metadata["groupKey"] = groupKey
+ merged.append(part)
+ else:
+ # Chunk the large table
+ chunks = self._chunkTable(part, maxSize)
+ merged.extend(chunks)
+
+ return merged
+
+ def _chunkTable(self, part: ContentPart, maxSize: int) -> List[ContentPart]:
+ """Chunk a large table by rows while preserving CSV structure."""
+ lines = part.data.split('\n')
+ if not lines:
+ return [part]
+
+ chunks: List[ContentPart] = []
+ currentChunk: List[str] = []
+ currentSize = 0
+
+ for line in lines:
+ lineSize = len(line.encode('utf-8')) + 1 # +1 for newline
+
+ if currentSize + lineSize > maxSize and currentChunk:
+ # Flush current chunk
+ chunkData = '\n'.join(currentChunk)
+ chunks.append(ContentPart(
+ id=makeId(),
+ parentId=part.parentId,
+ label=f"{part.label}_chunk_{len(chunks)}",
+ typeGroup="table",
+ mimeType=part.mimeType,
+ data=chunkData,
+ metadata={
+ "size": len(chunkData.encode('utf-8')),
+ "chunk": True,
+ "originalPart": part.id,
+ "chunkIndex": len(chunks)
+ }
+ ))
+ currentChunk = [line]
+ currentSize = lineSize
+ else:
+ currentChunk.append(line)
+ currentSize += lineSize
+
+ # Flush remaining chunk
+ if currentChunk:
+ chunkData = '\n'.join(currentChunk)
+ chunks.append(ContentPart(
+ id=makeId(),
+ parentId=part.parentId,
+ label=f"{part.label}_chunk_{len(chunks)}",
+ typeGroup="table",
+ mimeType=part.mimeType,
+ data=chunkData,
+ metadata={
+ "size": len(chunkData.encode('utf-8')),
+ "chunk": True,
+ "originalPart": part.id,
+ "chunkIndex": len(chunks)
+ }
+ ))
+
+ return chunks
diff --git a/modules/services/serviceExtraction/merging/text_merger.py b/modules/services/serviceExtraction/merging/text_merger.py
new file mode 100644
index 00000000..bb9e850d
--- /dev/null
+++ b/modules/services/serviceExtraction/merging/text_merger.py
@@ -0,0 +1,136 @@
+from typing import Any, Dict, List
+from modules.datamodels.datamodelExtraction import ContentPart
+from ..utils import makeId
+
+
+class TextMerger:
+ def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
+ """
+ Merge text parts based on strategy.
+ Strategy options:
+ - groupBy: "parentId" (default), "documentId", "none"
+ - orderBy: "label", "pageIndex", "sheetIndex", "none"
+ - maxSize: maximum size per merged part
+ """
+ if not parts:
+ return parts
+
+ groupBy = strategy.get("groupBy", "parentId")
+ orderBy = strategy.get("orderBy", "label")
+ maxSize = strategy.get("maxSize", 0)
+
+ # Group parts
+ groups = self._groupParts(parts, groupBy)
+
+ merged: List[ContentPart] = []
+ for groupKey, groupParts in groups.items():
+ # Sort within group
+ sortedParts = self._sortParts(groupParts, orderBy)
+
+ # Merge respecting maxSize
+ if maxSize > 0:
+ merged.extend(self._mergeWithSizeLimit(sortedParts, maxSize))
+ else:
+ merged.extend(self._mergeGroup(sortedParts, groupKey))
+
+ return merged
+
+ def _groupParts(self, parts: List[ContentPart], groupBy: str) -> Dict[str, List[ContentPart]]:
+ groups: Dict[str, List[ContentPart]] = {}
+
+ for part in parts:
+ if part.typeGroup != "text":
+ # Non-text parts go in their own group
+ key = f"nontext_{part.id}"
+ if key not in groups:
+ groups[key] = []
+ groups[key].append(part)
+ continue
+
+ if groupBy == "parentId":
+ key = part.parentId or "root"
+ elif groupBy == "documentId":
+ key = part.metadata.get("documentId", "unknown")
+ else: # "none"
+ key = "all"
+
+ if key not in groups:
+ groups[key] = []
+ groups[key].append(part)
+
+ return groups
+
+ def _sortParts(self, parts: List[ContentPart], orderBy: str) -> List[ContentPart]:
+ if orderBy == "pageIndex":
+ return sorted(parts, key=lambda p: p.metadata.get("pageIndex", 0))
+ elif orderBy == "sheetIndex":
+ return sorted(parts, key=lambda p: p.metadata.get("sheetIndex", 0))
+ elif orderBy == "label":
+ return sorted(parts, key=lambda p: p.label)
+ else: # "none"
+ return parts
+
+ def _mergeGroup(self, parts: List[ContentPart], groupKey: str) -> List[ContentPart]:
+ if not parts:
+ return []
+ if len(parts) == 1:
+ return parts
+
+ # Merge all text parts in group
+ textParts = [p for p in parts if p.typeGroup == "text"]
+ nonTextParts = [p for p in parts if p.typeGroup != "text"]
+
+ if not textParts:
+ return nonTextParts
+
+ # Combine text data
+ combinedData = "\n".join([p.data for p in textParts])
+ totalSize = sum(p.metadata.get("size", 0) for p in textParts)
+
+ mergedPart = ContentPart(
+ id=makeId(),
+ parentId=textParts[0].parentId,
+ label=f"merged_{groupKey}",
+ typeGroup="text",
+ mimeType="text/plain",
+ data=combinedData,
+ metadata={
+ "size": totalSize,
+ "merged": len(textParts),
+ "originalParts": [p.id for p in textParts]
+ }
+ )
+
+ return [mergedPart] + nonTextParts
+
+ def _mergeWithSizeLimit(self, parts: List[ContentPart], maxSize: int) -> List[ContentPart]:
+ if not parts:
+ return []
+
+ textParts = [p for p in parts if p.typeGroup == "text"]
+ nonTextParts = [p for p in parts if p.typeGroup != "text"]
+
+ if not textParts:
+ return nonTextParts
+
+ merged: List[ContentPart] = []
+ currentGroup: List[ContentPart] = []
+ currentSize = 0
+
+ for part in textParts:
+ partSize = part.metadata.get("size", 0)
+
+ if currentSize + partSize > maxSize and currentGroup:
+ # Flush current group
+ merged.extend(self._mergeGroup(currentGroup, f"chunk_{len(merged)}"))
+ currentGroup = [part]
+ currentSize = partSize
+ else:
+ currentGroup.append(part)
+ currentSize += partSize
+
+ # Flush remaining group
+ if currentGroup:
+ merged.extend(self._mergeGroup(currentGroup, f"chunk_{len(merged)}"))
+
+ return merged + nonTextParts
diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py
new file mode 100644
index 00000000..4441e9c4
--- /dev/null
+++ b/modules/services/serviceExtraction/subPipeline.py
@@ -0,0 +1,186 @@
+from typing import Any, Dict, List
+
+from modules.datamodels.datamodelExtraction import ExtractedContent, ContentPart
+from .utils import makeId
+from .subRegistry import ExtractorRegistry, ChunkerRegistry
+from .merging.text_merger import TextMerger
+from .merging.table_merger import TableMerger
+from .merging.default_merger import DefaultMerger
+
+
+def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ExtractedContent:
+ extractor = extractorRegistry.resolve(mimeType, fileName)
+ if extractor is None:
+ # fallback: single binary part
+ part = ContentPart(
+ id=makeId(),
+ parentId=None,
+ label="file",
+ typeGroup="binary",
+ mimeType=mimeType or "application/octet-stream",
+ data="",
+ metadata={"warning": "No extractor registered"}
+ )
+ return ExtractedContent(id=makeId(), parts=[part])
+
+ parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options})
+ # Optional merge step
+ mergeStrategy = options.get("mergeStrategy", {})
+ if mergeStrategy:
+ parts = _mergeParts(parts, mergeStrategy)
+ return ExtractedContent(id=makeId(), parts=parts)
+
+
+def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]:
+ maxSize = int(options.get("maxSize", 0) or 0)
+ chunkAllowed = bool(options.get("chunkAllowed", False))
+ mergeStrategy = options.get("mergeStrategy", {})
+
+ if maxSize <= 0:
+ # Still apply merging if strategy provided
+ if mergeStrategy:
+ return _applyMerging(parts, mergeStrategy)
+ return parts
+
+ # First, try to fit within size limit
+ current = 0
+ kept: List[ContentPart] = []
+ remaining: List[ContentPart] = []
+
+ for p in parts:
+ size = int(p.metadata.get("size", 0) or 0)
+ if current + size <= maxSize:
+ kept.append(p)
+ current += size
+ else:
+ remaining.append(p)
+
+ # If we have remaining parts and chunking is allowed, try chunking
+ if remaining and chunkAllowed:
+ for p in remaining:
+ if p.typeGroup in ("text", "table", "structure"):
+ chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options)
+ for ch in chunks:
+ chSize = int(ch.get("size", 0) or 0)
+ if current + chSize <= maxSize:
+ kept.append(ContentPart(
+ id=makeId(),
+ parentId=p.id,
+ label=f"chunk_{ch.get('order', 0)}",
+ typeGroup=p.typeGroup,
+ mimeType=p.mimeType,
+ data=ch.get("data", ""),
+ metadata={"size": chSize, "chunk": True}
+ ))
+ current += chSize
+ else:
+ break
+
+ # Apply merging strategy if provided
+ if mergeStrategy:
+ kept = _applyMerging(kept, mergeStrategy)
+
+ # Re-check size after merging
+ totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept)
+ if totalSize > maxSize and mergeStrategy.get("maxSize"):
+ # Apply size limit to merged parts
+ kept = _applySizeLimit(kept, maxSize)
+
+ return kept
+
+
+def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]:
+ """Apply merging strategy to parts."""
+ textMerger = TextMerger()
+ tableMerger = TableMerger()
+ defaultMerger = DefaultMerger()
+
+ # Group by typeGroup
+ textParts = [p for p in parts if p.typeGroup == "text"]
+ tableParts = [p for p in parts if p.typeGroup == "table"]
+ structureParts = [p for p in parts if p.typeGroup == "structure"]
+ otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")]
+
+ merged: List[ContentPart] = []
+
+ if textParts:
+ merged.extend(textMerger.merge(textParts, strategy))
+ if tableParts:
+ merged.extend(tableMerger.merge(tableParts, strategy))
+ if structureParts:
+ # For now, treat structure like text
+ merged.extend(textMerger.merge(structureParts, strategy))
+ if otherParts:
+ merged.extend(defaultMerger.merge(otherParts, strategy))
+
+ return merged
+
+
+def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]:
+ """Apply size limit by prioritizing parts and truncating if necessary."""
+ # Sort by priority: text first, then others
+ priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6}
+ sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99))
+
+ kept: List[ContentPart] = []
+ current_size = 0
+
+ for part in sorted_parts:
+ part_size = int(part.metadata.get("size", 0) or 0)
+ if current_size + part_size <= maxSize:
+ kept.append(part)
+ current_size += part_size
+ else:
+ # Try to truncate text parts
+ if part.typeGroup == "text" and part_size > 0:
+ remaining_size = maxSize - current_size
+ if remaining_size > 1000: # Only truncate if we have meaningful space
+ truncated_data = part.data[:remaining_size * 4] # Rough character estimate
+ truncated_part = ContentPart(
+ id=makeId(),
+ parentId=part.parentId,
+ label=f"{part.label}_truncated",
+ typeGroup=part.typeGroup,
+ mimeType=part.mimeType,
+ data=truncated_data,
+ metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True}
+ )
+ kept.append(truncated_part)
+ break
+
+ return kept
+
+
+def applyAiIfRequested(extracted: ExtractedContent, options: Dict[str, Any]) -> ExtractedContent:
+ """
+ Apply AI processing if requested in options.
+ This is a placeholder for actual AI integration.
+ """
+ prompt = options.get("prompt")
+ operationType = options.get("operationType", "general")
+
+ if not prompt:
+ return extracted
+
+ # Placeholder AI processing based on operationType
+ if operationType == "analyse_content":
+ # Add analysis metadata to parts
+ for part in extracted.parts:
+ if part.typeGroup in ("text", "table", "structure"):
+ part.metadata["ai_processed"] = True
+ part.metadata["operation_type"] = operationType
+ elif operationType == "generate_plan":
+ # Add plan generation metadata
+ for part in extracted.parts:
+ if part.typeGroup == "text":
+ part.metadata["ai_processed"] = True
+ part.metadata["operation_type"] = operationType
+ elif operationType == "generate_content":
+ # Add content generation metadata
+ for part in extracted.parts:
+ part.metadata["ai_processed"] = True
+ part.metadata["operation_type"] = operationType
+
+ return extracted
+
+
diff --git a/modules/services/serviceExtraction/subRegistry.py b/modules/services/serviceExtraction/subRegistry.py
new file mode 100644
index 00000000..ceb9b1b5
--- /dev/null
+++ b/modules/services/serviceExtraction/subRegistry.py
@@ -0,0 +1,103 @@
+from typing import Any, Dict, Optional
+
+from .types import ContentPart
+
+
+class Extractor:
+ def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool:
+ return False
+
+ def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> list[ContentPart]:
+ raise NotImplementedError
+
+
+class Chunker:
+ def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]:
+ return []
+
+
+class ExtractorRegistry:
+ def __init__(self):
+ self._map: Dict[str, Extractor] = {}
+ self._fallback: Optional[Extractor] = None
+ # Register built-ins
+ try:
+ from .formats.text_extractor import TextExtractor
+ from .formats.csv_extractor import CsvExtractor
+ from .formats.json_extractor import JsonExtractor
+ from .formats.xml_extractor import XmlExtractor
+ from .formats.html_extractor import HtmlExtractor
+ from .formats.pdf_extractor import PdfExtractor
+ from .formats.docx_extractor import DocxExtractor
+ from .formats.xlsx_extractor import XlsxExtractor
+ from .formats.image_extractor import ImageExtractor
+ from .formats.binary_extractor import BinaryExtractor
+ self.register("text/plain", TextExtractor())
+ self.register("text/markdown", TextExtractor())
+ self.register("text/csv", CsvExtractor())
+ self.register("application/json", JsonExtractor())
+ self.register("application/xml", XmlExtractor())
+ self.register("text/html", HtmlExtractor())
+ self.register("application/pdf", PdfExtractor())
+ self.register("application/vnd.openxmlformats-officedocument.wordprocessingml.document", DocxExtractor())
+ self.register("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", XlsxExtractor())
+ # images
+ self.register("image/jpeg", ImageExtractor())
+ self.register("image/png", ImageExtractor())
+ self.register("image/gif", ImageExtractor())
+ # extension fallbacks
+ self.register("txt", TextExtractor())
+ self.register("md", TextExtractor())
+ self.register("csv", CsvExtractor())
+ self.register("json", JsonExtractor())
+ self.register("xml", XmlExtractor())
+ self.register("html", HtmlExtractor())
+ self.register("htm", HtmlExtractor())
+ self.register("pdf", PdfExtractor())
+ self.register("docx", DocxExtractor())
+ self.register("xlsx", XlsxExtractor())
+ self.register("xlsm", XlsxExtractor())
+ # fallback
+ self.setFallback(BinaryExtractor())
+ except Exception:
+ pass
+
+ def register(self, key: str, extractor: Extractor):
+ self._map[key] = extractor
+
+ def setFallback(self, extractor: Extractor):
+ self._fallback = extractor
+
+ def resolve(self, mimeType: str, fileName: str) -> Optional[Extractor]:
+ if mimeType in self._map:
+ return self._map[mimeType]
+ # simple extension fallback
+ if "." in fileName:
+ ext = fileName.lower().rsplit(".", 1)[-1]
+ if ext in self._map:
+ return self._map[ext]
+ return self._fallback
+
+
+class ChunkerRegistry:
+ def __init__(self):
+ self._map: Dict[str, Chunker] = {}
+ self._noop = Chunker()
+ # Register default chunkers
+ try:
+ from .chunking.text_chunker import TextChunker
+ from .chunking.table_chunker import TableChunker
+ from .chunking.structure_chunker import StructureChunker
+ self.register("text", TextChunker())
+ self.register("table", TableChunker())
+ self.register("structure", StructureChunker())
+ except Exception:
+ pass
+
+ def register(self, typeGroup: str, chunker: Chunker):
+ self._map[typeGroup] = chunker
+
+ def resolve(self, typeGroup: str) -> Chunker:
+ return self._map.get(typeGroup, self._noop)
+
+
diff --git a/modules/services/serviceExtraction/utils/__init__.py b/modules/services/serviceExtraction/utils/__init__.py
new file mode 100644
index 00000000..a16d3f59
--- /dev/null
+++ b/modules/services/serviceExtraction/utils/__init__.py
@@ -0,0 +1,7 @@
+import uuid
+
+
+def makeId() -> str:
+ return str(uuid.uuid4())
+
+
diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py
index a3111b54..033a89b6 100644
--- a/modules/workflows/methods/methodAi.py
+++ b/modules/workflows/methods/methodAi.py
@@ -9,6 +9,7 @@ from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult
+from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
logger = logging.getLogger(__name__)
@@ -32,7 +33,7 @@ class MethodAi(MethodBase):
Parameters:
aiPrompt (str): The AI prompt for processing
documentList (list, optional): List of document references to include in context
- expectedDocumentFormats (list, optional): Expected output formats with extension, mimeType, description
+ expectedDocumentFormat (str, optional): Expected document output format with extension, mimeType, description
processingMode (str, optional): Processing mode - use 'basic', 'advanced', or 'detailed' (defaults to 'basic')
includeMetadata (bool, optional): Whether to include metadata (default: True)
operationType (str, optional): Operation type - use 'general', 'generate_plan', 'analyse_content', 'generate_content', 'web_research', 'image_analysis', or 'image_generation'
@@ -46,7 +47,7 @@ class MethodAi(MethodBase):
documentList = parameters.get("documentList", [])
if isinstance(documentList, str):
documentList = [documentList]
- expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
+ expectedDocumentFormat = parameters.get("expectedDocumentFormat", "")
processingMode = parameters.get("processingMode", "basic")
includeMetadata = parameters.get("includeMetadata", True)
operationType = parameters.get("operationType", "general")
@@ -64,8 +65,7 @@ class MethodAi(MethodBase):
output_extension = ".txt" # Default
output_mime_type = "text/plain" # Default
- if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
- expected_format = expectedDocumentFormats[0]
+ if expectedDocumentFormat:
output_extension = expected_format.get("extension", ".txt")
output_mime_type = expected_format.get("mimeType", "text/plain")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
@@ -205,26 +205,19 @@ class MethodAi(MethodBase):
output_format = output_extension.replace('.', '') or 'txt'
- # Build options from parameters
- options = {
- "process_type": "text",
- "operation_type": operationType,
- "priority": priority,
- "compress_prompt": processingMode != "detailed",
- "compress_documents": True,
- "process_documents_individually": True,
- "processing_mode": processingMode,
- "result_format_requested": output_format,
- "include_metadata": includeMetadata
- }
-
- # Add optional parameters if provided
- if maxCost is not None:
- options["max_cost"] = maxCost
- if maxProcessingTime is not None:
- options["max_processing_time"] = maxProcessingTime
- if requiredTags is not None:
- options["required_tags"] = requiredTags
+ # Build options using new AiCallOptions format
+ options = AiCallOptions(
+ operationType=operationType,
+ priority=priority,
+ compressPrompt=processingMode != "detailed",
+ compressContext=True,
+ processDocumentsIndividually=True,
+ processingMode=processingMode,
+ resultFormat=output_format,
+ maxCost=maxCost,
+ maxProcessingTime=maxProcessingTime,
+ requiredTags=requiredTags
+ )
result = await self.services.ai.callAi(
prompt=call_prompt,
@@ -260,19 +253,17 @@ class MethodAi(MethodBase):
result = await self.services.ai.callAi(
prompt=guardrail_prompt,
documents=context or None,
- options={
- "process_type": "text",
- "operation_type": "generate_content",
- "priority": "quality",
- "compress_prompt": False,
- "compress_documents": True,
- "process_documents_individually": True,
- "processing_mode": "detailed",
- "result_format_requested": "json",
- "include_metadata": False,
- "max_cost": 0.03,
- "max_processing_time": 30
- }
+ options=AiCallOptions(
+ operationType=OperationType.GENERATE_CONTENT,
+ priority=Priority.QUALITY,
+ compressPrompt=False,
+ compressContext=True,
+ processDocumentsIndividually=True,
+ processingMode="detailed",
+ resultFormat="json",
+ maxCost=0.03,
+ maxProcessingTime=30
+ )
)
except Exception:
result = cleaned # fallback to first attempt
diff --git a/modules/workflows/methods/methodDocument.py b/modules/workflows/methods/methodDocument.py
index 7f4239e6..61b4741b 100644
--- a/modules/workflows/methods/methodDocument.py
+++ b/modules/workflows/methods/methodDocument.py
@@ -10,6 +10,7 @@ from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult, ChatDocument
+from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
logger = logging.getLogger(__name__)
@@ -532,14 +533,13 @@ class MethodDocument(MethodBase):
formatted_content = await self.services.ai.callAi(
prompt=ai_prompt,
documents=None,
- options={
- "process_type": "text",
- "operation_type": "generate_content",
- "priority": "speed",
- "compress_prompt": True,
- "compress_documents": False,
- "max_cost": 0.02
- }
+ options=AiCallOptions(
+ operationType=OperationType.GENERATE_CONTENT,
+ priority=Priority.SPEED,
+ compressPrompt=True,
+ compressContext=False,
+ maxCost=0.02
+ )
)
if not formatted_content or formatted_content.strip() == "":
@@ -776,19 +776,17 @@ SOURCE DOCUMENT CONTENT:
aiReport = await self.services.ai.callAi(
prompt=aiPrompt,
documents=documents or None,
- options={
- "process_type": "text",
- "operation_type": "report_generation",
- "priority": "quality",
- "compress_prompt": False,
- "compress_documents": True,
- "process_documents_individually": True,
- "result_format_requested": "html",
- "include_metadata": includeMetadata,
- "processing_mode": "detailed",
- "max_cost": 0.08,
- "max_processing_time": 90
- }
+ options=AiCallOptions(
+ operationType=OperationType.GENERATE_CONTENT, # Using GENERATE_CONTENT for report generation
+ priority=Priority.QUALITY,
+ compressPrompt=False,
+ compressContext=True,
+ processDocumentsIndividually=True,
+ resultFormat="html",
+ processingMode="detailed",
+ maxCost=0.08,
+ maxProcessingTime=90
+ )
)
# If AI call fails, return error - AI is crucial for report generation
diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py
index c44bf405..02fca00b 100644
--- a/modules/workflows/methods/methodOutlook.py
+++ b/modules/workflows/methods/methodOutlook.py
@@ -12,7 +12,9 @@ import uuid
import requests
from modules.workflows.methods.methodBase import MethodBase, action
-from modules.datamodels.datamodelWorkflow import ActionResult, ChatDocument
+from modules.datamodels.datamodelWorkflow import ActionResult
+from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
+from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelUam import ConnectionStatus
logger = logging.getLogger(__name__)
@@ -1519,17 +1521,15 @@ class MethodOutlook(MethodBase):
composed_email = await self.services.ai.callAi(
prompt=ai_prompt,
documents=documents or None,
- options={
- "process_type": "text",
- "operation_type": "email_composition",
- "priority": "speed",
- "compress_prompt": True,
- "compress_documents": True,
- "process_documents_individually": False,
- "include_metadata": True,
- "max_cost": 0.02,
- "max_processing_time": 15
- }
+ options=AiCallOptions(
+ operationType=OperationType.GENERATE_CONTENT, # Using GENERATE_CONTENT for email composition
+ priority=Priority.SPEED,
+ compressPrompt=True,
+ compressContext=True,
+ processDocumentsIndividually=False,
+ maxCost=0.02,
+ maxProcessingTime=15
+ )
)
# Parse the AI response to ensure it's valid JSON
diff --git a/modules/workflows/methods/methodWeb.py b/modules/workflows/methods/methodWeb.py
index cd8a4afd..69ccee06 100644
--- a/modules/workflows/methods/methodWeb.py
+++ b/modules/workflows/methods/methodWeb.py
@@ -5,6 +5,7 @@ import json as _json
from typing import Any, Dict
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument
+from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
from modules.datamodels.datamodelWeb import (
WebSearchRequest,
WebCrawlRequest,
@@ -278,16 +279,15 @@ class MethodWeb(MethodBase):
summary = await self.services.ai.callAi(
prompt=prompt,
documents=None,
- options={
- "process_type": "text",
- "operation_type": "analyse_content",
- "priority": "balanced",
- "compress_prompt": True,
- "compress_documents": False,
- "processing_mode": "advanced",
- "max_cost": 0.05,
- "max_processing_time": 30
- }
+ options=AiCallOptions(
+ operationType=OperationType.ANALYSE_CONTENT,
+ priority=Priority.BALANCED,
+ compressPrompt=True,
+ compressContext=False,
+ processingMode="advanced",
+ maxCost=0.05,
+ maxProcessingTime=30
+ )
)
summary = summary.strip()
except Exception:
diff --git a/modules/workflows/processing/handlingTasks.py b/modules/workflows/processing/handlingTasks.py
index f1fb7b94..fbd6163e 100644
--- a/modules/workflows/processing/handlingTasks.py
+++ b/modules/workflows/processing/handlingTasks.py
@@ -33,6 +33,20 @@ from modules.workflows.processing.promptFactory import (
createActionParameterPrompt,
createRefinementPrompt
)
+from modules.workflows.processing.promptFactoryPlaceholders import (
+ createTaskPlanningPromptTemplate,
+ createActionDefinitionPromptTemplate,
+ createActionSelectionPromptTemplate,
+ createActionParameterPromptTemplate,
+ createRefinementPromptTemplate,
+ createResultReviewPromptTemplate,
+ extractUserPrompt,
+ extractAvailableDocuments,
+ extractWorkflowHistory,
+ extractAvailableMethods,
+ extractUserLanguage,
+ extractReviewContent
+)
from modules.services.serviceDocument.mainServiceDocumentGeneration import DocumentGenerationService
from modules.workflows.processing.promptFactory import methods
from modules.workflows.processing.executionState import should_continue
@@ -117,16 +131,28 @@ class HandlingTasks:
}
)
- # Generate the task planning prompt
- task_planning_prompt = createTaskPlanningPrompt(task_planning_context, self.service)
+ # Generate the task planning prompt with placeholders
+ task_planning_prompt_template = createTaskPlanningPromptTemplate()
+
+ # Extract content for placeholders
+ user_prompt = extractUserPrompt(task_planning_context)
+ available_documents = extractAvailableDocuments(task_planning_context)
+ workflow_history = extractWorkflowHistory(self.service, task_planning_context)
+
+ # Create placeholders dictionary
+ placeholders = {
+ "USER_PROMPT": user_prompt,
+ "AVAILABLE_DOCUMENTS": available_documents,
+ "WORKFLOW_HISTORY": workflow_history
+ }
# Log task planning prompt sent to AI
logger.info("=== TASK PLANNING PROMPT SENT TO AI ===")
# Trace task planning prompt
- self.writeTraceLog("Task Plan Prompt", task_planning_prompt)
-
- # Centralized AI call: Task planning (quality, detailed)
+ self.writeTraceLog("Task Plan Prompt", task_planning_prompt_template)
+ self.writeTraceLog("Task Plan Placeholders", placeholders)
+ # Centralized AI call: Task planning (quality, detailed) with placeholders
options = AiCallOptions(
operationType=OperationType.GENERATE_PLAN,
priority=Priority.QUALITY,
@@ -137,9 +163,9 @@ class HandlingTasks:
maxProcessingTime=30
)
- prompt = await self.services.ai.callAiText(
- prompt=task_planning_prompt,
- documents=None,
+ prompt = await self.services.ai.callAi(
+ prompt=task_planning_prompt_template,
+ placeholders=placeholders,
options=options
)
@@ -382,12 +408,30 @@ class HandlingTasks:
# Check workflow status before calling AI service
self._checkWorkflowStopped()
- # Generate the action definition prompt
- action_prompt = await createActionDefinitionPrompt(action_context, self.service)
- # Trace action planning prompt
- self.writeTraceLog("Action Plan Prompt", action_prompt)
+ # Generate the action definition prompt with placeholders
+ action_prompt_template = createActionDefinitionPromptTemplate()
- # Centralized AI call: Action planning (quality, detailed)
+ # Extract content for placeholders
+ user_prompt = extractUserPrompt(action_context)
+ available_documents = extractAvailableDocuments(action_context)
+ workflow_history = extractWorkflowHistory(self.service, action_context)
+ available_methods = extractAvailableMethods(self.service)
+ user_language = extractUserLanguage(self.service)
+
+ # Create placeholders dictionary
+ placeholders = {
+ "USER_PROMPT": user_prompt,
+ "AVAILABLE_DOCUMENTS": available_documents,
+ "WORKFLOW_HISTORY": workflow_history,
+ "AVAILABLE_METHODS": available_methods,
+ "USER_LANGUAGE": user_language
+ }
+
+ # Trace action planning prompt
+ self.writeTraceLog("Action Plan Prompt", action_prompt_template)
+ self.writeTraceLog("Action Plan Placeholders", placeholders)
+
+ # Centralized AI call: Action planning (quality, detailed) with placeholders
options = AiCallOptions(
operationType=OperationType.GENERATE_PLAN,
priority=Priority.QUALITY,
@@ -398,9 +442,9 @@ class HandlingTasks:
maxProcessingTime=30
)
- prompt = await self.services.ai.callAiText(
- prompt=action_prompt,
- documents=None,
+ prompt = await self.services.ai.callAi(
+ prompt=action_prompt_template,
+ placeholders=placeholders,
options=options
)
@@ -478,8 +522,25 @@ class HandlingTasks:
async def plan_select(self, context: TaskContext) -> Dict[str, Any]:
"""Plan: select exactly one action. Returns {"action": {method, name}}"""
- prompt = createActionSelectionPrompt(context, self.service)
- self.writeTraceLog("React Plan Selection Prompt", prompt)
+ prompt_template = createActionSelectionPromptTemplate()
+
+ # Extract content for placeholders
+ user_prompt = extractUserPrompt(context)
+ available_documents = extractAvailableDocuments(context)
+ user_language = extractUserLanguage(self.service)
+ available_methods = extractAvailableMethods(self.service)
+
+ # Create placeholders dictionary
+ placeholders = {
+ "USER_PROMPT": user_prompt,
+ "AVAILABLE_DOCUMENTS": available_documents,
+ "USER_LANGUAGE": user_language,
+ "AVAILABLE_METHODS": available_methods
+ }
+
+ self.writeTraceLog("React Plan Selection Prompt", prompt_template)
+ self.writeTraceLog("React Plan Selection Placeholders", placeholders)
+
# Centralized AI call for plan selection (use plan generation quality)
options = AiCallOptions(
operationType=OperationType.GENERATE_PLAN,
@@ -491,9 +552,9 @@ class HandlingTasks:
maxProcessingTime=30
)
- response = await self.services.ai.callAiText(
- prompt=prompt,
- documents=None,
+ response = await self.services.ai.callAi(
+ prompt=prompt_template,
+ placeholders=placeholders,
options=options
)
self.writeTraceLog("React Plan Selection Response", response)
@@ -509,8 +570,35 @@ class HandlingTasks:
async def act_execute(self, context: TaskContext, selection: Dict[str, Any], task_step: TaskStep, workflow, step_index: int) -> ActionResult:
"""Act: request minimal parameters then execute selected action."""
action = selection.get('action', {})
- params_prompt = createActionParameterPrompt(context, action, self.service)
- self.writeTraceLog("React Parameters Prompt", params_prompt)
+ prompt_template = createActionParameterPromptTemplate()
+
+ # Extract content for placeholders
+ user_prompt = extractUserPrompt(context)
+ available_documents = extractAvailableDocuments(context)
+ user_language = extractUserLanguage(self.service)
+
+ # Get action signature
+ method = action.get('method', '')
+ name = action.get('name', '')
+ action_signature = ""
+ if self.service and method in methods:
+ method_instance = methods[method]['instance']
+ action_signature = method_instance.getActionSignature(name)
+
+ selected_action = f"{method}.{name}"
+
+ # Create placeholders dictionary
+ placeholders = {
+ "USER_PROMPT": user_prompt,
+ "AVAILABLE_DOCUMENTS": available_documents,
+ "USER_LANGUAGE": user_language,
+ "SELECTED_ACTION": selected_action,
+ "ACTION_SIGNATURE": action_signature
+ }
+
+ self.writeTraceLog("React Parameters Prompt", prompt_template)
+ self.writeTraceLog("React Parameters Placeholders", placeholders)
+
# Centralized AI call for parameter suggestion (balanced analysis)
options = AiCallOptions(
operationType=OperationType.ANALYSE_CONTENT,
@@ -522,9 +610,9 @@ class HandlingTasks:
maxProcessingTime=30
)
- params_resp = await self.services.ai.callAiText(
- prompt=params_prompt,
- documents=None,
+ params_resp = await self.services.ai.callAi(
+ prompt=prompt_template,
+ placeholders=placeholders,
options=options
)
self.writeTraceLog("React Parameters Response", params_resp)
@@ -578,8 +666,21 @@ class HandlingTasks:
async def refine_decide(self, context: TaskContext, observation: Dict[str, Any]) -> Dict[str, Any]:
"""Refine: decide continue or stop, with reason"""
- prompt = createRefinementPrompt(context, observation)
- self.writeTraceLog("React Refinement Prompt", prompt)
+ prompt_template = createRefinementPromptTemplate()
+
+ # Extract content for placeholders
+ user_prompt = extractUserPrompt(context)
+ review_content = extractReviewContent(type('Context', (), {'observation': observation})())
+
+ # Create placeholders dictionary
+ placeholders = {
+ "USER_PROMPT": user_prompt,
+ "REVIEW_CONTENT": review_content
+ }
+
+ self.writeTraceLog("React Refinement Prompt", prompt_template)
+ self.writeTraceLog("React Refinement Placeholders", placeholders)
+
# Centralized AI call for refinement decision (balanced analysis)
options = AiCallOptions(
operationType=OperationType.ANALYSE_CONTENT,
@@ -591,9 +692,9 @@ class HandlingTasks:
maxProcessingTime=30
)
- resp = await self.services.ai.callAiText(
- prompt=prompt,
- documents=None,
+ resp = await self.services.ai.callAi(
+ prompt=prompt_template,
+ placeholders=placeholders,
options=options
)
self.writeTraceLog("React Refinement Response", resp)
@@ -1100,8 +1201,18 @@ class HandlingTasks:
# Check workflow status before calling AI service
self._checkWorkflowStopped()
- # Use promptFactory for review prompt
- prompt = createResultReviewPrompt(review_context, self.service)
+ # Use placeholder-based review prompt
+ prompt_template = createResultReviewPromptTemplate()
+
+ # Extract content for placeholders
+ user_prompt = extractUserPrompt(review_context)
+ review_content = extractReviewContent(review_context)
+
+ # Create placeholders dictionary
+ placeholders = {
+ "USER_PROMPT": user_prompt,
+ "REVIEW_CONTENT": review_content
+ }
# Log result review prompt sent to AI
logger.info("=== RESULT REVIEW PROMPT SENT TO AI ===")
@@ -1109,9 +1220,10 @@ class HandlingTasks:
logger.info(f"Action Results Count: {len(review_context.action_results) if review_context.action_results else 0}")
logger.info(f"Task Actions Count: {len(review_context.task_actions) if review_context.task_actions else 0}")
# Trace result review prompt
- self.writeTraceLog("Result Review Prompt", prompt)
+ self.writeTraceLog("Result Review Prompt", prompt_template)
+ self.writeTraceLog("Result Review Placeholders", placeholders)
- # Centralized AI call: Result validation (balanced analysis)
+ # Centralized AI call: Result validation (balanced analysis) with placeholders
options = AiCallOptions(
operationType=OperationType.ANALYSE_CONTENT,
priority=Priority.BALANCED,
@@ -1122,9 +1234,9 @@ class HandlingTasks:
maxProcessingTime=30
)
- response = await self.services.ai.callAiText(
- prompt=prompt,
- documents=None,
+ response = await self.services.ai.callAi(
+ prompt=prompt_template,
+ placeholders=placeholders,
options=options
)
diff --git a/modules/workflows/processing/promptFactoryPlaceholders.py b/modules/workflows/processing/promptFactoryPlaceholders.py
new file mode 100644
index 00000000..de2ef6a7
--- /dev/null
+++ b/modules/workflows/processing/promptFactoryPlaceholders.py
@@ -0,0 +1,418 @@
+"""
+Placeholder-based prompt factory for dynamic AI calls.
+This module provides prompt templates with placeholders that can be filled dynamically.
+"""
+
+import json
+from typing import Dict, Any
+from modules.workflows.processing.promptFactory import (
+ _getAvailableDocuments,
+ _getPreviousRoundContext,
+ getMethodsList,
+ getEnhancedDocumentContext,
+ _getConnectionReferenceList,
+ methods
+)
+
+
+def createTaskPlanningPromptTemplate() -> str:
+ """Create task planning prompt template with placeholders."""
+ return """You are a task planning AI that analyzes user requests and creates structured, self-contained task plans with user-friendly feedback messages.
+
+USER REQUEST: {{KEY:USER_PROMPT}}
+
+AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}}
+
+PREVIOUS WORKFLOW ROUNDS CONTEXT:
+{{KEY:WORKFLOW_HISTORY}}
+
+INSTRUCTIONS:
+1. Analyze the user request, available documents, and previous workflow rounds context
+2. If the user request appears to be a follow-up (like "try again", "versuche es nochmals", "retry", etc.),
+ use the PREVIOUS WORKFLOW ROUNDS CONTEXT to understand what the user wants to retry or continue
+3. Group related topics and sequential steps into single, comprehensive tasks
+4. Focus on business outcomes, not technical operations
+5. Make each task self-contained: clearly state what to do and what outputs are expected
+6. Ensure proper handover between tasks (later actions will use your task outputs)
+7. Detect the language of the user request and include it in languageUserDetected
+8. Generate user-friendly messages for each task in the user's request language
+9. Return a JSON object with the exact structure shown below
+
+TASK GROUPING PRINCIPLES:
+- COMBINE RELATED TOPICS: Group related subjects, sequential steps, or workflow-structured activities into single tasks
+- SEQUENTIAL WORKFLOWS: If the user says "first do this, then that, then that" → create ONE task that handles the entire sequence
+- SIMILAR CONTENT: If multiple items deal with the same subject matter → combine into ONE comprehensive task
+- ONLY SPLIT WHEN DIFFERENT: Create separate tasks ONLY when the user explicitly wants different, independent things
+
+EXAMPLES OF GOOD TASK GROUPING:
+
+COMBINE INTO ONE TASK:
+- "Analyze the documents, extract key insights, and create a summary report" → ONE task: "Analyze documents and create comprehensive summary report"
+- "First check my emails, then respond to urgent ones, then organize my inbox" → ONE task: "Process and organize email inbox with priority responses"
+- "Review the budget, analyze spending patterns, and suggest cost-cutting measures" → ONE task: "Comprehensive budget analysis with optimization recommendations"
+- "Create a business strategy, develop marketing plan, and prepare presentation" → ONE task: "Develop complete business strategy with marketing plan and presentation"
+
+SPLIT INTO MULTIPLE TASKS:
+- "Create a business strategy for Q4" AND "Check my emails for messages from my assistant" → TWO separate tasks (different subjects)
+- "Analyze customer feedback" AND "Prepare quarterly financial report" → TWO separate tasks (different business areas)
+- "Review project timeline" AND "Update employee handbook" → TWO separate tasks (unrelated activities)
+
+TASK PLANNING PRINCIPLES:
+- Break down complex requests into logical, sequential steps
+- Focus on business value and outcomes
+- Keep tasks at a meaningful level of abstraction (not implementation details)
+- Each task should produce results that can be used by subsequent tasks
+- Ensure clear dependencies and handovers between tasks
+- Provide clear, actionable user messages in the user's request language
+- Group related activities to minimize task fragmentation
+- Only create multiple tasks when dealing with truly different, independent objectives
+- Make task objectives action-oriented and specific (include scope, data sources to consider, and output intent at high level)
+- Write success_criteria as measurable acceptance criteria focusing on outputs (what artifacts or insights will exist and how they are validated)
+
+FOLLOW-UP PROMPT HANDLING:
+- If the user request is a follow-up (e.g., "try again", "versuche es nochmals", "retry", "continue", "proceed"),
+ analyze the PREVIOUS WORKFLOW ROUNDS CONTEXT to understand what failed or was incomplete
+- Use the previous round's user requests and task outcomes to determine what the user wants to retry
+- If previous rounds failed due to missing documents, and documents are now available,
+ create tasks that use the newly available documents to accomplish the original request
+- Maintain the same business objective from previous rounds but adapt to current available resources
+
+SPECIFIC SCENARIO HANDLING:
+- If previous round failed with "documents missing" error and current round has documents available,
+ the user likely wants to retry the same operation with the newly provided documents
+- Example: Previous round "speichere mir die 3 dokumente im sharepoint unter xxx" failed due to missing documents,
+ current round "versuche es nochmals" with documents should retry the SharePoint save operation
+- Always check if the current request is a retry by looking for retry keywords and previous round context
+
+REQUIRED JSON STRUCTURE:
+{{
+ "overview": "Brief description of the overall plan",
+ "languageUserDetected": "en", // Language code detected from user request (en, de, fr, it, es, etc.)
+ "userMessage": "User-friendly message explaining the task plan in user's request language",
+ "tasks": [
+ {{
+ "id": "task_1",
+ "objective": "Clear business objective this task accomplishes (combining related activities)",
+ "dependencies": ["task_0"], // IDs of tasks that must complete first
+ "success_criteria": ["criteria1", "criteria2"],
+ "estimated_complexity": "low|medium|high",
+ "userMessage": "User-friendly message explaining what this task will accomplish in user's request language"
+ }}
+ ]
+}}
+
+EXAMPLES OF GOOD TASK OBJECTIVES (COMBINING RELATED ACTIVITIES):
+- "Analyze documents and extract key insights for business communication"
+- "Create professional business communication incorporating analyzed information"
+- "Execute business communication using specified channels and document outcomes"
+- "Develop comprehensive business strategy with implementation roadmap and success metrics"
+
+EXAMPLES OF WELL-FORMED SUCCESS CRITERIA (OUTPUT-FOCUSED):
+- "Deliver a prioritized list of 10–20 candidates with justification"
+- "Provide a structured JSON with fields: company, ticker, rationale, metrics"
+- "Produce a presentation outline with 5 sections and bullet points per section"
+- "Include data sources and date stamped references for traceability"
+
+EXAMPLES OF GOOD SUCCESS CRITERIA:
+- "Key insights extracted and ready for business use"
+- "Professional communication created with clear business value"
+- "Business communication successfully delivered and documented"
+- "All outcomes properly documented and accessible"
+
+EXAMPLES OF BAD TASK OBJECTIVES:
+- "Read the PDF file" (too granular - should be "Analyze document content")
+- "Convert data to CSV" (implementation detail - should be "Structure data for analysis")
+- "Send email" (too specific - should be "Deliver business communication")
+
+LANGUAGE DETECTION:
+- Analyze the user request text to identify the language
+- Use standard language codes: en (English), de (German), fr (French), it (Italian), es (Spanish), etc.
+- If the language cannot be determined, use "en" as default
+- Include the detected language in the languageUserDetected field
+
+NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
+
+
+def createActionDefinitionPromptTemplate() -> str:
+ """Create action definition prompt template with placeholders."""
+ return """You are an action planning AI that generates specific, executable actions for task steps.
+
+TASK OBJECTIVE: {{KEY:USER_PROMPT}}
+
+AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}}
+
+WORKFLOW HISTORY: {{KEY:WORKFLOW_HISTORY}}
+
+AVAILABLE METHODS: {{KEY:AVAILABLE_METHODS}}
+
+USER LANGUAGE: {{KEY:USER_LANGUAGE}}
+
+INSTRUCTIONS:
+- Generate actions to accomplish this task step using available documents, connections, and previous results
+- Use docItem for single documents and docList for groups of documents as shown in AVAILABLE DOCUMENTS
+- If there are no documents available, do not create document extraction actions. Select methods strictly based on the task objective; choose web actions when external information is required. Otherwise, generate a status/information report requesting needed inputs.
+- Always pass documentList as a LIST of references (docItem and/or docList) - this list CANNOT be empty for document extraction actions
+- For referencing documents from previous actions, use the format "round{current_round}_task{current_task}_action{action_number}_{descriptive_label}"
+- Each action must be self-contained and executable with the provided parameters
+- For document extraction, ensure prompts are specific and detailed
+- Include validation steps in extraction prompts where relevant
+- If this is a retry, learn from previous failures and improve the approach
+- Address specific issues mentioned in previous review feedback
+- When specifying expectedDocumentFormats, ensure AI prompts explicitly request pure data without markdown formatting
+- Generate user-friendly messages for each action in the user's language
+
+PARAMETER COMPLETENESS REQUIREMENTS:
+- Every parameter must contain all information needed to execute without implicit context
+- Use explicit, concrete values (units, languages, formats, limits, date ranges, IDs) when applicable
+- For search-like parameters (if any method requires a query), derive the query from the task objective AND ALL success criteria dimensions. Include:
+ - Key entities and domain terms from the objective
+ - All distinct facets from success_criteria (e.g., valuation AND AI potential AND know-how needs)
+ - Geography/localization (e.g., Schweiz/Suisse/Switzerland; use multilingual synonyms when helpful)
+ - Time horizon or recency if relevant
+ - Boolean operators and synonyms to increase precision (use AND/OR, quotes, parentheses)
+ - Avoid single-topic or generic queries focused only on one facet (e.g., pure valuation metrics)
+ - When facets are truly distinct, create 1–3 focused actions with precise queries rather than one vague catch-all
+- Document list parameters must reference only existing labels or prior action outputs; do not reference future outputs
+
+DOCUMENT ROUTING GUIDANCE:
+- Each action should produce documents with a clear resultLabel for routing
+- Use consistent naming: "round{current_round}_task{current_task}_action{action_number}_{descriptive_label}"
+- Ensure document flow: Action A produces documents that Action B can consume
+- Document labels should be descriptive of content, not just "results" or "output"
+- Consider what subsequent actions will need and structure outputs accordingly
+
+REQUIRED JSON STRUCTURE:
+{{
+ "actions": [
+ {{
+ "method": "method_name",
+ "action": "action_name",
+ "parameters": {{}},
+ "resultLabel": "round{current_round}_task{current_task}_action{action_number}_{descriptive_label}",
+ "description": "Brief description of what this action accomplishes",
+ "userMessage": "User-friendly message explaining what this action will do in user's language"
+ }}
+ ]
+}}
+
+IMPORTANT NOTES:
+- Respond with ONLY the JSON object. Do not include any explanatory text.
+- Before creating any document extraction action, verify that AVAILABLE DOCUMENTS contains actual document references.
+- Always include a user-friendly userMessage for each action in the user's language.
+- The examples above show German user messages as reference - adapt the language to match the USER LANGUAGE specified above."""
+
+
+def createActionSelectionPromptTemplate() -> str:
+ """Create action selection prompt template with placeholders."""
+ return """Select exactly one action to advance the task.
+
+OBJECTIVE: {{KEY:USER_PROMPT}}
+AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}}
+USER LANGUAGE: {{KEY:USER_LANGUAGE}}
+
+MINIMAL TOOL CATALOG (method -> action -> [parameterNames]):
+{{KEY:AVAILABLE_METHODS}}
+
+BUSINESS RULES:
+- Pick exactly one action per step.
+- Derive choice from objective and success criteria.
+- Prefer user language.
+- Keep it minimal; avoid provider specifics.
+
+RESPONSE FORMAT (JSON only):
+{{"action":{{"method":"web","name":"search"}}}}"""
+
+
+def createActionParameterPromptTemplate() -> str:
+ """Create action parameter prompt template with placeholders."""
+ return """Provide only the required parameters for this action.
+
+SELECTED ACTION: {{KEY:SELECTED_ACTION}}
+ACTION SIGNATURE: {{KEY:ACTION_SIGNATURE}}
+OBJECTIVE: {{KEY:USER_PROMPT}}
+AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}}
+USER LANGUAGE: {{KEY:USER_LANGUAGE}}
+
+RULES:
+- Return only the parameters object.
+- Include user language if relevant.
+- Reference documents only by exact labels available.
+- Avoid unnecessary fields; host applies defaults.
+- Use the ACTION SIGNATURE above to understand what parameters are required.
+- Convert the objective into appropriate parameter values as needed.
+
+RESPONSE FORMAT (JSON only):
+{{"parameters":{{}}}}"""
+
+
+def createRefinementPromptTemplate() -> str:
+ """Create refinement prompt template with placeholders."""
+ return """Decide next step based on observation.
+
+OBJECTIVE: {{KEY:USER_PROMPT}}
+OBSERVATION:
+{{KEY:REVIEW_CONTENT}}
+
+RULES:
+- If criteria are met or no further action helps, decide stop.
+- Else decide continue.
+
+RESPONSE FORMAT (JSON only):
+{{"decision":"continue","reason":"Need more data"}}"""
+
+
+def createResultReviewPromptTemplate() -> str:
+ """Create result review prompt template with placeholders."""
+ return """You are a result validation AI that reviews task execution outcomes and determines success, retry needs, or failure.
+
+TASK OBJECTIVE: {{KEY:USER_PROMPT}}
+
+EXECUTION RESULTS:
+{{KEY:REVIEW_CONTENT}}
+
+VALIDATION CRITERIA:
+- Review each action's success/failure status
+- Check if required documents were produced
+- Validate document quality and completeness
+- Assess if success criteria were met
+- Identify any missing or incomplete outputs
+- Determine if retry would help or if task should be marked as failed
+
+REQUIRED JSON STRUCTURE:
+{{
+ "status": "success|retry|failed",
+ "reason": "Detailed explanation of the validation decision",
+ "improvements": ["specific improvement 1", "specific improvement 2"],
+ "quality_score": 8, // 1-10 scale
+ "met_criteria": ["criteria1", "criteria2"],
+ "unmet_criteria": ["criteria3", "criteria4"],
+ "confidence": 0.85, // 0.0-1.0 scale
+ "userMessage": "User-friendly message explaining the validation result"
+}}
+
+VALIDATION PRINCIPLES:
+- Be thorough but fair in assessment
+- Focus on business value and outcomes
+- Consider both technical execution and business results
+- Provide specific, actionable improvement suggestions
+- Use quality scores to track progress across retries
+- Clearly identify which success criteria were met vs. unmet
+- Set appropriate confidence levels based on evidence quality
+
+NOTE: Respond with ONLY the JSON object. Do not include any explanatory text."""
+
+
+# Helper functions to extract content for placeholders
+
+def extractUserPrompt(context) -> str:
+ """Extract user prompt from context."""
+ if hasattr(context, 'task_step') and context.task_step:
+ return context.task_step.objective or 'No request specified'
+ return 'No request specified'
+
+
+def extractAvailableDocuments(context) -> str:
+ """Extract available documents from context."""
+ if hasattr(context, 'workflow') and context.workflow:
+ return _getAvailableDocuments(context.workflow)
+ return "No documents available"
+
+
+def extractWorkflowHistory(service, context) -> str:
+ """Extract workflow history from context."""
+ if hasattr(context, 'workflow') and context.workflow:
+ return _getPreviousRoundContext(service, context.workflow) or "No previous workflow rounds - this is the first round."
+ return "No previous workflow rounds - this is the first round."
+
+
+def extractAvailableMethods(service) -> str:
+ """Extract available methods for action planning."""
+ methodList = getMethodsList(service)
+ method_actions = {}
+ for sig in methodList:
+ if '.' in sig:
+ method, rest = sig.split('.', 1)
+ action = rest.split('(')[0]
+ method_actions.setdefault(method, []).append((action, sig))
+
+ # Create a structured JSON format for better AI parsing
+ available_methods_json = {}
+ for method, actions in method_actions.items():
+ available_methods_json[method] = {}
+ # Get the method instance for accessing docstrings
+ method_instance = methods.get(method, {}).get('instance') if methods else None
+
+ for action, sig in actions:
+ # Parse the signature to extract parameters
+ if '(' in sig and ')' in sig:
+ # Extract parameters from signature
+ params_start = sig.find('(')
+ params_end = sig.find(')')
+ params_str = sig[params_start+1:params_end]
+
+ # Parse parameters directly from the docstring - much simpler and more reliable!
+ parameters = []
+
+ # Get the actual function's docstring
+ if method_instance and hasattr(method_instance, action):
+ func = getattr(method_instance, action)
+ if hasattr(func, '__doc__') and func.__doc__:
+ docstring = func.__doc__
+
+ # Parse Parameters section from docstring
+ lines = docstring.split('\n')
+ in_parameters = False
+ for i, line in enumerate(lines):
+ original_line = line
+ line = line.strip()
+
+ if line.startswith('Parameters:'):
+ in_parameters = True
+ continue
+ elif line.startswith('Returns:') or line.startswith('Raises:') or line.startswith('Note:') or line.startswith('Example:') or line.startswith('Examples:'):
+ in_parameters = False
+ continue
+ elif in_parameters and line and not line.startswith('-') and not line.startswith('*'):
+ # This is a parameter line
+ if ':' in line:
+ param_name = line.split(':')[0].strip()
+ param_desc = line.split(':', 1)[1].strip()
+ parameters.append(f"{param_name}: {param_desc}")
+
+ available_methods_json[method][action] = parameters
+ else:
+ available_methods_json[method][action] = []
+
+ return json.dumps(available_methods_json, indent=2, ensure_ascii=False)
+
+
+def extractUserLanguage(service) -> str:
+ """Extract user language from service."""
+ return service.user.language if service and service.user else 'en'
+
+
+def extractReviewContent(context) -> str:
+ """Extract review content from context."""
+ if hasattr(context, 'action_results') and context.action_results:
+ # Build result summary
+ result_summary = ""
+ for i, result in enumerate(context.action_results):
+ result_summary += f"\nRESULT {i+1}:\n"
+ result_summary += f" Success: {result.success}\n"
+ if result.error:
+ result_summary += f" Error: {result.error}\n"
+
+ if result.documents:
+ result_summary += f" Documents: {len(result.documents)} document(s)\n"
+ for doc in result.documents:
+ doc_name = getattr(doc, 'documentName', 'Unknown')
+ doc_mime = getattr(doc, 'mimeType', 'Unknown')
+ result_summary += f" - {doc_name} ({doc_mime})\n"
+ else:
+ result_summary += f" Documents: None\n"
+
+ return result_summary
+ elif hasattr(context, 'observation') and context.observation:
+ return json.dumps(context.observation, ensure_ascii=False)
+ else:
+ return "No review content available"