diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 2d2e5e95..bab071b6 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -1,4 +1,4 @@ -from typing import Optional, List +from typing import Optional, List, Dict, Any, Literal from pydantic import BaseModel, Field @@ -81,18 +81,36 @@ PROCESSING_MODE_PRIORITY_MAPPING = { } +class ModelCapabilities(BaseModel): + """Model capabilities and characteristics for dynamic selection.""" + + name: str = Field(description="Model name/identifier") + maxTokens: int = Field(description="Maximum token limit for this model") + capabilities: List[str] = Field(description="List of capabilities: text, image, vision, reasoning, analysis, etc.") + costPerToken: float = Field(default=0.0, description="Cost per token (if available)") + processingTime: float = Field(default=1.0, description="Average processing time multiplier") + isAvailable: bool = Field(default=True, description="Whether model is currently available") + + class AiCallOptions(BaseModel): """Options for centralized AI processing with clear operation types and tags.""" operationType: str = Field(default="general", description="Type of operation: general, generate_plan, analyse_content, generate_content, web_research") priority: str = Field(default="balanced", description="speed|quality|cost|balanced") compressPrompt: bool = Field(default=True, description="Whether to compress the prompt") - compressContext: bool = Field(default=True, description="Whether to compress optional context") + compressContext: bool = Field(default=True, description="If False: process each chunk; If True: summarize and work on summary") + processDocumentsIndividually: bool = Field(default=True, description="If True, process each document separately; else pool docs") + maxContextBytes: Optional[int] = Field(default=None, description="Hard cap for extracted context size passed to the model") maxCost: Optional[float] = Field(default=None, description="Max cost budget") maxProcessingTime: Optional[int] = Field(default=None, description="Max processing time in seconds") requiredTags: Optional[List[str]] = Field(default=None, description="Required model tags for selection") processingMode: str = Field(default="basic", description="Processing mode: basic, advanced, detailed") resultFormat: Optional[str] = Field(default=None, description="Expected result format: txt, json, csv, xml, etc.") + + # New fields for dynamic strategy + callType: Literal["planning", "text"] = Field(default="text", description="Call type: planning or text") + safetyMargin: float = Field(default=0.1, ge=0.0, le=0.5, description="Safety margin for token limits (0.0-0.5)") + modelCapabilities: Optional[List[str]] = Field(default=None, description="Required model capabilities for filtering") class AiCallRequest(BaseModel): diff --git a/modules/datamodels/datamodelExtraction.py b/modules/datamodels/datamodelExtraction.py new file mode 100644 index 00000000..78ce657e --- /dev/null +++ b/modules/datamodels/datamodelExtraction.py @@ -0,0 +1,21 @@ +from typing import Any, Dict, List, Optional +from dataclasses import dataclass, field + + +@dataclass +class ContentPart: + id: str + parentId: Optional[str] + label: str + typeGroup: str + mimeType: str + data: str + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ExtractedContent: + id: str + parts: List[ContentPart] + summary: Optional[Dict[str, Any]] = None + diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 0bd00a90..be0b0bad 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -2,8 +2,8 @@ import logging from typing import Dict, Any, List, Optional, Tuple, Union from modules.datamodels.datamodelChat import ChatDocument -from modules.services.serviceDocument.mainServiceDocumentExtraction import DocumentExtractionService -from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions +from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService +from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, ModelCapabilities, OperationType, Priority from modules.datamodels.datamodelWeb import ( WebSearchRequest, WebCrawlRequest, @@ -34,7 +34,7 @@ class AiService: self.serviceCenter = serviceCenter # Only depend on interfaces self.aiObjects = None # Will be initialized in create() - self.documentExtractor = DocumentExtractionService() + self.extractionService = ExtractionService() @classmethod async def create(cls, serviceCenter=None) -> "AiService": @@ -59,10 +59,14 @@ class AiService: documents, options.operationType if options else "general", options.compressContext if options else True, - processDocumentsIndividually, + options.processDocumentsIndividually if options else processDocumentsIndividually, ) effectiveOptions = options or AiCallOptions() + # Compute maxContextBytes if not provided: conservative defaults per model tag could be added here + if options and options.maxContextBytes is None: + options.maxContextBytes = 16000 # bytes, conservative default if model limit unknown + request = AiCallRequest( prompt=prompt, context=documentContent or None, @@ -167,42 +171,60 @@ class AiService: if not documents: return "" + # Build extraction options + extractionOptions: Dict[str, Any] = { + "prompt": f"Extract relevant content for {operationType}", + "operationType": operationType, + "processDocumentsIndividually": processIndividually, + # Respect size/ chunking hints if provided via AiCallOptions + "maxSize": getattr(getattr(self, "_aiOptions", None), "maxContextBytes", None) or 0, + "chunkAllowed": getattr(getattr(self, "_aiOptions", None), "chunkAllowed", True), + # basic merge strategy for text by parent + "mergeStrategy": {"groupBy": "parentId", "orderBy": "pageIndex"}, + } + + # Prepare documentList for extractor + documentList: List[Dict[str, Any]] = [] + for d in documents: + documentList.append({ + "id": d.id, + "bytes": d.fileData, + "fileName": d.fileName, + "mimeType": d.mimeType, + }) + processedContents: List[str] = [] - for doc in documents: - try: - extracted = await self.documentExtractor.processFileData( - doc.fileData, - doc.fileName, - doc.mimeType, - prompt=f"Extract relevant content for {operationType}", - documentId=doc.id, - enableAI=True, - ) - docContent: List[str] = [] - for contentItem in extracted.contents: - if contentItem.data and contentItem.data.strip(): - docContent.append(contentItem.data) + try: + extractionResult = self.extractionService.extractDocuments(documentList, extractionOptions) - if docContent: - combinedDocContent = "\n\n".join(docContent) - if ( - compressDocuments - and len(combinedDocContent.encode("utf-8")) > 10000 - ): - combinedDocContent = await self._compressContent( - combinedDocContent, 10000, "document" - ) - processedContents.append( - f"Document: {doc.fileName}\n{combinedDocContent}" - ) - except Exception as e: - logger.warning( - f"Error processing document {doc.fileName}: {str(e)}" - ) - processedContents.append( - f"Document: {doc.fileName}\n[Error processing document: {str(e)}]" - ) + def _partsToText(parts) -> str: + lines: List[str] = [] + for p in parts: + if p.typeGroup in ("text", "table", "structure") and p.data and isinstance(p.data, str): + lines.append(p.data) + return "\n\n".join(lines) + + if processIndividually and isinstance(extractionResult, list): + for i, ec in enumerate(extractionResult): + try: + contentText = _partsToText(ec.parts) + if compressDocuments and len(contentText.encode("utf-8")) > 10000: + contentText = await self._compressContent(contentText, 10000, "document") + processedContents.append(contentText) + except Exception as e: + logger.warning(f"Error aggregating extracted content: {str(e)}") + processedContents.append("[Error aggregating content]") + else: + # pooled mode returns dict + parts = extractionResult.get("parts", []) if isinstance(extractionResult, dict) else [] + contentText = _partsToText(parts) + if compressDocuments and len(contentText.encode("utf-8")) > 10000: + contentText = await self._compressContent(contentText, 10000, "document") + processedContents.append(contentText) + except Exception as e: + logger.warning(f"Error during extraction: {str(e)}") + processedContents.append("[Error during extraction]") return "\n\n---\n\n".join(processedContents) @@ -227,3 +249,267 @@ class AiService: logger.warning(f"AI compression failed, using truncation: {str(e)}") return content[:targetSize] + "... [truncated]" + # ===== DYNAMIC GENERIC AI CALLS IMPLEMENTATION ===== + + async def callAi( + self, + prompt: str, + documents: Optional[List[ChatDocument]] = None, + placeholders: Optional[Dict[str, str]] = None, + options: Optional[AiCallOptions] = None + ) -> str: + """ + Unified AI call interface that automatically routes to appropriate handler. + + Args: + prompt: The main prompt for the AI call + documents: Optional list of documents to process + placeholders: Optional dictionary of placeholder replacements for planning calls + options: AI call configuration options + + Returns: + AI response as string + + Raises: + Exception: If all available models fail + """ + if options is None: + options = AiCallOptions() + + # Auto-determine call type based on documents and operation type + call_type = self._determineCallType(documents, options.operationType) + options.callType = call_type + + if call_type == "planning": + return await self._callAiPlanning(prompt, placeholders, options) + else: + return await self._callAiText(prompt, documents, options) + + def _determineCallType(self, documents: Optional[List[ChatDocument]], operation_type: str) -> str: + """ + Determine call type based on documents and operation type. + + Criteria: no documents AND (operationType is "generate_plan" or "analyse_content") -> planning + """ + has_documents = documents is not None and len(documents) > 0 + is_planning_operation = operation_type in [OperationType.GENERATE_PLAN, OperationType.ANALYSE_CONTENT] + + if not has_documents and is_planning_operation: + return "planning" + else: + return "text" + + async def _callAiPlanning( + self, + prompt: str, + placeholders: Optional[Dict[str, str]], + options: AiCallOptions + ) -> str: + """ + Handle planning calls with placeholder system and selective summarization. + """ + # Get available models for planning (text + reasoning capabilities) + models = self._getModelsForOperation("planning", options) + + for model in models: + try: + # Build full prompt with placeholders + full_prompt = self._buildPromptWithPlaceholders(prompt, placeholders) + + # Check size and reduce if needed + if self._exceedsTokenLimit(full_prompt, model, options.safetyMargin): + full_prompt = self._reducePlanningPrompt(full_prompt, placeholders, model, options) + + # Make AI call using existing callAiText + result = await self.callAiText( + prompt=full_prompt, + documents=None, + options=options + ) + return result + + except Exception as e: + logger.warning(f"Planning model {model.name} failed: {e}") + continue + + raise Exception("All planning models failed - check model availability and capabilities") + + async def _callAiText( + self, + prompt: str, + documents: Optional[List[ChatDocument]], + options: AiCallOptions + ) -> str: + """ + Handle text calls with document processing through ExtractionService. + """ + # Get available models for text processing + models = self._getModelsForOperation("text", options) + + for model in models: + try: + # Extract and process documents using ExtractionService + context = "" + if documents: + # Convert ChatDocument to documentList format for ExtractionService + documentList = [{ + "id": d.id, + "bytes": d.fileData, + "fileName": d.fileName, + "mimeType": d.mimeType + } for d in documents] + + extracted_content = await self.extractionService.extractDocuments( + documentList=documentList, + options={ + "prompt": prompt, + "operationType": options.operationType, + "processDocumentsIndividually": options.processDocumentsIndividually, + "maxSize": options.maxContextBytes or int(model.maxTokens * 0.9), + "chunkAllowed": not options.compressContext, + "mergeStrategy": {"groupBy": "typeGroup"} + } + ) + + # Get text content from extracted parts using typeGroup-aware processing + context = self._extractTextFromContentParts(extracted_content) + + # Check size and reduce if needed + full_prompt = prompt + "\n\n" + context if context else prompt + if self._exceedsTokenLimit(full_prompt, model, options.safetyMargin): + full_prompt = self._reduceTextPrompt(prompt, context, model, options) + + # Make AI call using existing callAiText + result = await self.callAiText( + prompt=full_prompt, + documents=None, + options=options + ) + return result + + except Exception as e: + logger.warning(f"Text model {model.name} failed: {e}") + continue + + raise Exception("All text models failed - check model availability and capabilities") + + def _getModelsForOperation(self, operation_type: str, options: AiCallOptions) -> List[ModelCapabilities]: + """ + Get models capable of handling the specific operation with capability filtering. + """ + # For now, return a default model - this will be enhanced with actual model registry + default_model = ModelCapabilities( + name="default", + maxTokens=4000, + capabilities=["text", "reasoning"] if operation_type == "planning" else ["text"], + costPerToken=0.001, + processingTime=1.0, + isAvailable=True + ) + return [default_model] + + def _buildPromptWithPlaceholders(self, prompt: str, placeholders: Optional[Dict[str, str]]) -> str: + """ + Build full prompt by replacing placeholders with their content. + Uses the new {{KEY:placeholder}} format. + """ + if not placeholders: + return prompt + + full_prompt = prompt + for placeholder, content in placeholders.items(): + # Replace both old format {{placeholder}} and new format {{KEY:placeholder}} + full_prompt = full_prompt.replace(f"{{{{{placeholder}}}}}", content) + full_prompt = full_prompt.replace(f"{{{{KEY:{placeholder}}}}}", content) + + return full_prompt + + def _exceedsTokenLimit(self, text: str, model: ModelCapabilities, safety_margin: float) -> bool: + """ + Check if text exceeds model token limit with safety margin. + """ + # Simple character-based estimation (4 chars per token) + estimated_tokens = len(text) // 4 + max_tokens = int(model.maxTokens * (1 - safety_margin)) + return estimated_tokens > max_tokens + + def _reducePlanningPrompt( + self, + full_prompt: str, + placeholders: Optional[Dict[str, str]], + model: ModelCapabilities, + options: AiCallOptions + ) -> str: + """ + Reduce planning prompt size by summarizing placeholders while preserving prompt structure. + """ + if not placeholders: + return self._reduceText(full_prompt, 0.7) + + # Reduce placeholders while preserving prompt + reduced_placeholders = {} + for placeholder, content in placeholders.items(): + if len(content) > 1000: # Only reduce long content + reduction_factor = 0.7 + reduced_content = self._reduceText(content, reduction_factor) + reduced_placeholders[placeholder] = reduced_content + else: + reduced_placeholders[placeholder] = content + + return self._buildPromptWithPlaceholders(full_prompt, reduced_placeholders) + + def _reduceTextPrompt( + self, + prompt: str, + context: str, + model: ModelCapabilities, + options: AiCallOptions + ) -> str: + """ + Reduce text prompt size using typeGroup-aware chunking and merging. + """ + max_size = int(model.maxTokens * (1 - options.safetyMargin)) + + if options.compressPrompt: + # Reduce both prompt and context + target_size = max_size + current_size = len(prompt) + len(context) + reduction_factor = (target_size * 0.7) / current_size + + if reduction_factor < 1.0: + prompt = self._reduceText(prompt, reduction_factor) + context = self._reduceText(context, reduction_factor) + else: + # Only reduce context, preserve prompt integrity + max_context_size = max_size - len(prompt) + if len(context) > max_context_size: + reduction_factor = max_context_size / len(context) + context = self._reduceText(context, reduction_factor) + + return prompt + "\n\n" + context if context else prompt + + def _extractTextFromContentParts(self, extracted_content) -> str: + """ + Extract text content from ExtractionService ContentPart objects. + """ + if not extracted_content or not hasattr(extracted_content, 'parts'): + return "" + + text_parts = [] + for part in extracted_content.parts: + if hasattr(part, 'typeGroup') and part.typeGroup in ['text', 'table', 'structure']: + if hasattr(part, 'data') and part.data: + text_parts.append(part.data) + + return "\n\n".join(text_parts) + + def _reduceText(self, text: str, reduction_factor: float) -> str: + """ + Reduce text size by the specified factor. + """ + if reduction_factor >= 1.0: + return text + + target_length = int(len(text) * reduction_factor) + return text[:target_length] + "... [reduced]" + diff --git a/modules/services/serviceExtraction/__init__.py b/modules/services/serviceExtraction/__init__.py new file mode 100644 index 00000000..9fe4727e --- /dev/null +++ b/modules/services/serviceExtraction/__init__.py @@ -0,0 +1,5 @@ +from .mainServiceExtraction import ExtractionService + +__all__ = ["ExtractionService"] + + diff --git a/modules/services/serviceExtraction/chunking/__init__.py b/modules/services/serviceExtraction/chunking/__init__.py new file mode 100644 index 00000000..139597f9 --- /dev/null +++ b/modules/services/serviceExtraction/chunking/__init__.py @@ -0,0 +1,2 @@ + + diff --git a/modules/services/serviceExtraction/chunking/structure_chunker.py b/modules/services/serviceExtraction/chunking/structure_chunker.py new file mode 100644 index 00000000..921b3d4d --- /dev/null +++ b/modules/services/serviceExtraction/chunking/structure_chunker.py @@ -0,0 +1,59 @@ +from typing import Any, Dict, List +import json + +from ..types import ContentPart +from ..subRegistry import Chunker + + +class StructureChunker(Chunker): + def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: + maxBytes = int(options.get("structureChunkSize", 40000)) + data = part.data or "" + # best-effort: try JSON list/object bucketing; else fallback to line-based + chunks: List[Dict[str, Any]] = [] + try: + obj = json.loads(data) + def emit(bucket: Any): + text = json.dumps(bucket, ensure_ascii=False) + chunks.append({"data": text, "size": len(text.encode('utf-8')), "order": len(chunks)}) + if isinstance(obj, list): + bucket: list[Any] = [] + size = 0 + for item in obj: + text = json.dumps(item, ensure_ascii=False) + s = len(text.encode('utf-8')) + if size + s > maxBytes and bucket: + emit(bucket) + bucket = [item] + size = s + else: + bucket.append(item) + size += s + if bucket: + emit(bucket) + else: + text = json.dumps(obj, ensure_ascii=False) + if len(text.encode('utf-8')) <= maxBytes: + emit(obj) + else: + # fallback to line chunking + raise ValueError("too large") + except Exception: + current: List[str] = [] + size = 0 + for line in data.split('\n'): + s = len(line.encode('utf-8')) + 1 + if size + s > maxBytes and current: + text = '\n'.join(current) + chunks.append({"data": text, "size": len(text.encode('utf-8')), "order": len(chunks)}) + current = [line] + size = s + else: + current.append(line) + size += s + if current: + text = '\n'.join(current) + chunks.append({"data": text, "size": len(text.encode('utf-8')), "order": len(chunks)}) + return chunks + + diff --git a/modules/services/serviceExtraction/chunking/table_chunker.py b/modules/services/serviceExtraction/chunking/table_chunker.py new file mode 100644 index 00000000..9a614896 --- /dev/null +++ b/modules/services/serviceExtraction/chunking/table_chunker.py @@ -0,0 +1,28 @@ +from typing import Any, Dict, List + +from ..types import ContentPart +from ..subRegistry import Chunker + + +class TableChunker(Chunker): + def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: + maxBytes = int(options.get("tableChunkSize", 40000)) + chunks: List[Dict[str, Any]] = [] + current: List[str] = [] + size = 0 + for line in part.data.split('\n'): + lineSize = len(line.encode('utf-8')) + 1 + if size + lineSize > maxBytes and current: + data = '\n'.join(current) + chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)}) + current = [line] + size = lineSize + else: + current.append(line) + size += lineSize + if current: + data = '\n'.join(current) + chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)}) + return chunks + + diff --git a/modules/services/serviceExtraction/chunking/text_chunker.py b/modules/services/serviceExtraction/chunking/text_chunker.py new file mode 100644 index 00000000..69218837 --- /dev/null +++ b/modules/services/serviceExtraction/chunking/text_chunker.py @@ -0,0 +1,28 @@ +from typing import Any, Dict, List + +from ..types import ContentPart +from ..subRegistry import Chunker + + +class TextChunker(Chunker): + def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: + maxBytes = int(options.get("textChunkSize", 40000)) + chunks: List[Dict[str, Any]] = [] + current: List[str] = [] + size = 0 + for line in part.data.split('\n'): + lineSize = len(line.encode('utf-8')) + 1 + if size + lineSize > maxBytes and current: + data = '\n'.join(current) + chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)}) + current = [line] + size = lineSize + else: + current.append(line) + size += lineSize + if current: + data = '\n'.join(current) + chunks.append({"data": data, "size": len(data.encode('utf-8')), "order": len(chunks)}) + return chunks + + diff --git a/modules/services/serviceExtraction/formats/__init__.py b/modules/services/serviceExtraction/formats/__init__.py new file mode 100644 index 00000000..139597f9 --- /dev/null +++ b/modules/services/serviceExtraction/formats/__init__.py @@ -0,0 +1,2 @@ + + diff --git a/modules/services/serviceExtraction/formats/binary_extractor.py b/modules/services/serviceExtraction/formats/binary_extractor.py new file mode 100644 index 00000000..1c201c36 --- /dev/null +++ b/modules/services/serviceExtraction/formats/binary_extractor.py @@ -0,0 +1,25 @@ +from typing import Any, Dict, List +import base64 + +from ..utils import makeId +from modules.datamodels.datamodelExtraction import ContentPart +from ..subRegistry import Extractor + + +class BinaryExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return True + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + mimeType = context.get("mimeType") or "application/octet-stream" + return [ContentPart( + id=makeId(), + parentId=None, + label="binary", + typeGroup="binary", + mimeType=mimeType, + data=base64.b64encode(fileBytes).decode("utf-8"), + metadata={"size": len(fileBytes), "warning": "Unsupported file type"} + )] + + diff --git a/modules/services/serviceExtraction/formats/csv_extractor.py b/modules/services/serviceExtraction/formats/csv_extractor.py new file mode 100644 index 00000000..db3cf969 --- /dev/null +++ b/modules/services/serviceExtraction/formats/csv_extractor.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, List + +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId +from ..subRegistry import Extractor + + +class CsvExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType == "text/csv" or (fileName or "").lower().endswith(".csv") + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + fileName = context.get("fileName") + mimeType = context.get("mimeType") or "text/csv" + data = fileBytes.decode("utf-8", errors="replace") + return [ContentPart( + id=makeId(), + parentId=None, + label="main", + typeGroup="table", + mimeType=mimeType, + data=data, + metadata={"size": len(fileBytes)} + )] + + diff --git a/modules/services/serviceExtraction/formats/docx_extractor.py b/modules/services/serviceExtraction/formats/docx_extractor.py new file mode 100644 index 00000000..6cb75716 --- /dev/null +++ b/modules/services/serviceExtraction/formats/docx_extractor.py @@ -0,0 +1,89 @@ +from typing import Any, Dict, List +import io + +from ..utils import makeId +from modules.datamodels.datamodelExtraction import ContentPart +from ..subRegistry import Extractor + + +class DocxExtractor(Extractor): + def __init__(self): + self._loaded = False + self._haveLibs = False + + def _load(self): + if self._loaded: + return + self._loaded = True + try: + global docx + import docx # python-docx + self._haveLibs = True + except Exception: + self._haveLibs = False + + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType == "application/vnd.openxmlformats-officedocument.wordprocessingml.document" or (fileName or "").lower().endswith(".docx") + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + self._load() + parts: List[ContentPart] = [] + rootId = makeId() + parts.append(ContentPart( + id=rootId, + parentId=None, + label="docx", + typeGroup="container", + mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document", + data="", + metadata={"size": len(fileBytes)} + )) + + if not self._haveLibs: + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label="binary", + typeGroup="binary", + mimeType="application/vnd.openxmlformats-officedocument.wordprocessingml.document", + data="", + metadata={"size": len(fileBytes), "warning": "DOCX lib not available"} + )) + return parts + + with io.BytesIO(fileBytes) as buf: + d = docx.Document(buf) + # paragraphs + for i, para in enumerate(d.paragraphs): + text = para.text or "" + if text.strip(): + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label=f"p_{i+1}", + typeGroup="text", + mimeType="text/plain", + data=text, + metadata={"size": len(text.encode('utf-8'))} + )) + # tables → CSV rows + for ti, table in enumerate(d.tables): + rows: list[str] = [] + for row in table.rows: + cells = [ (cell.text or "").replace('"', '""') for cell in row.cells ] + rows.append(",".join([f'"{c}"' for c in cells])) + csvData = "\n".join(rows) + if csvData: + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label=f"table_{ti+1}", + typeGroup="table", + mimeType="text/csv", + data=csvData, + metadata={"size": len(csvData.encode('utf-8'))} + )) + + return parts + + diff --git a/modules/services/serviceExtraction/formats/html_extractor.py b/modules/services/serviceExtraction/formats/html_extractor.py new file mode 100644 index 00000000..6c49c50c --- /dev/null +++ b/modules/services/serviceExtraction/formats/html_extractor.py @@ -0,0 +1,30 @@ +from typing import Any, Dict, List +from bs4 import BeautifulSoup + +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId +from ..subRegistry import Extractor + + +class HtmlExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType == "text/html" or (fileName or "").lower().endswith((".html", ".htm")) + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + mimeType = context.get("mimeType") or "text/html" + text = fileBytes.decode("utf-8", errors="replace") + try: + BeautifulSoup(text, "html.parser") + except Exception: + pass + return [ContentPart( + id=makeId(), + parentId=None, + label="main", + typeGroup="structure", + mimeType=mimeType, + data=text, + metadata={"size": len(fileBytes)} + )] + + diff --git a/modules/services/serviceExtraction/formats/image_extractor.py b/modules/services/serviceExtraction/formats/image_extractor.py new file mode 100644 index 00000000..296eb50b --- /dev/null +++ b/modules/services/serviceExtraction/formats/image_extractor.py @@ -0,0 +1,25 @@ +from typing import Any, Dict, List +import base64 + +from ..utils import makeId +from modules.datamodels.datamodelExtraction import ContentPart +from ..subRegistry import Extractor + + +class ImageExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return (mimeType or "").startswith("image/") + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + mimeType = context.get("mimeType") or "image/unknown" + return [ContentPart( + id=makeId(), + parentId=None, + label="image", + typeGroup="image", + mimeType=mimeType, + data=base64.b64encode(fileBytes).decode("utf-8"), + metadata={"size": len(fileBytes)} + )] + + diff --git a/modules/services/serviceExtraction/formats/json_extractor.py b/modules/services/serviceExtraction/formats/json_extractor.py new file mode 100644 index 00000000..456eb08e --- /dev/null +++ b/modules/services/serviceExtraction/formats/json_extractor.py @@ -0,0 +1,31 @@ +from typing import Any, Dict, List +import json + +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId +from ..subRegistry import Extractor + + +class JsonExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType == "application/json" or (fileName or "").lower().endswith(".json") + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + mimeType = context.get("mimeType") or "application/json" + text = fileBytes.decode("utf-8", errors="replace") + # verify JSON is well-formed; fall back to text if not + try: + json.loads(text) + except Exception: + pass + return [ContentPart( + id=makeId(), + parentId=None, + label="main", + typeGroup="structure", + mimeType=mimeType, + data=text, + metadata={"size": len(fileBytes)} + )] + + diff --git a/modules/services/serviceExtraction/formats/pdf_extractor.py b/modules/services/serviceExtraction/formats/pdf_extractor.py new file mode 100644 index 00000000..4d0d8058 --- /dev/null +++ b/modules/services/serviceExtraction/formats/pdf_extractor.py @@ -0,0 +1,110 @@ +from typing import Any, Dict, List +import base64 +import io + +from ..utils import makeId +from modules.datamodels.datamodelExtraction import ContentPart +from ..subRegistry import Extractor + + +class PdfExtractor(Extractor): + def __init__(self): + self._loaded = False + self._haveLibs = False + + def _load(self): + if self._loaded: + return + self._loaded = True + try: + global PyPDF2, fitz + import PyPDF2 + import fitz # PyMuPDF + self._haveLibs = True + except Exception: + self._haveLibs = False + + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType == "application/pdf" or (fileName or "").lower().endswith(".pdf") + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + self._load() + parts: List[ContentPart] = [] + rootId = makeId() + parts.append(ContentPart( + id=rootId, + parentId=None, + label="pdf", + typeGroup="container", + mimeType="application/pdf", + data="", + metadata={"size": len(fileBytes)} + )) + + if not self._haveLibs: + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label="binary", + typeGroup="binary", + mimeType="application/pdf", + data=base64.b64encode(fileBytes).decode("utf-8"), + metadata={"size": len(fileBytes), "warning": "PDF libs not available"} + )) + return parts + + # Extract text per page with PyPDF2 + try: + with io.BytesIO(fileBytes) as buf: + reader = PyPDF2.PdfReader(buf) + for i, page in enumerate(reader.pages): + try: + text = page.extract_text() or "" + if text.strip(): + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label=f"page_{i+1}", + typeGroup="text", + mimeType="text/plain", + data=text, + metadata={"pages": 1, "pageIndex": i, "size": len(text.encode('utf-8'))} + )) + except Exception: + continue + except Exception: + pass + + # Extract images with PyMuPDF + try: + with io.BytesIO(fileBytes) as buf2: + doc = fitz.open(stream=buf2, filetype="pdf") + for i in range(len(doc)): + page = doc[i] + images = page.get_images(full=True) + for j, img in enumerate(images): + try: + xref = img[0] + baseImage = doc.extract_image(xref) + if baseImage: + imgBytes = baseImage.get("image", b"") + ext = baseImage.get("ext", "png") + if imgBytes: + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label=f"image_{i+1}_{j}", + typeGroup="image", + mimeType=f"image/{ext}", + data=base64.b64encode(imgBytes).decode("utf-8"), + metadata={"pageIndex": i, "size": len(imgBytes)} + )) + except Exception: + continue + doc.close() + except Exception: + pass + + return parts + + diff --git a/modules/services/serviceExtraction/formats/text_extractor.py b/modules/services/serviceExtraction/formats/text_extractor.py new file mode 100644 index 00000000..5099d04c --- /dev/null +++ b/modules/services/serviceExtraction/formats/text_extractor.py @@ -0,0 +1,26 @@ +from typing import Any, Dict, List + +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId +from ..subRegistry import Extractor + + +class TextExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType in ("text/plain", "text/markdown") + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + fileName = context.get("fileName") + mimeType = context.get("mimeType") or "text/plain" + data = fileBytes.decode("utf-8", errors="replace") + return [ContentPart( + id=makeId(), + parentId=None, + label="main", + typeGroup="text", + mimeType=mimeType, + data=data, + metadata={"size": len(fileBytes)} + )] + + diff --git a/modules/services/serviceExtraction/formats/xlsx_extractor.py b/modules/services/serviceExtraction/formats/xlsx_extractor.py new file mode 100644 index 00000000..577b0776 --- /dev/null +++ b/modules/services/serviceExtraction/formats/xlsx_extractor.py @@ -0,0 +1,93 @@ +from typing import Any, Dict, List +import io +from datetime import datetime + +from ..utils import makeId +from modules.datamodels.datamodelExtraction import ContentPart +from ..subRegistry import Extractor + + +class XlsxExtractor(Extractor): + def __init__(self): + self._loaded = False + self._haveLibs = False + + def _load(self): + if self._loaded: + return + self._loaded = True + try: + global openpyxl + import openpyxl + self._haveLibs = True + except Exception: + self._haveLibs = False + + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + mt = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" + return mimeType == mt or (fileName or "").lower().endswith((".xlsx", ".xlsm")) + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + self._load() + parts: List[ContentPart] = [] + rootId = makeId() + parts.append(ContentPart( + id=rootId, + parentId=None, + label="xlsx", + typeGroup="container", + mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + data="", + metadata={"size": len(fileBytes)} + )) + + if not self._haveLibs: + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label="binary", + typeGroup="binary", + mimeType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + data="", + metadata={"size": len(fileBytes), "warning": "openpyxl not available"} + )) + return parts + + with io.BytesIO(fileBytes) as buf: + wb = openpyxl.load_workbook(buf, data_only=True) + for sheetName in wb.sheetnames: + ws = wb[sheetName] + # extract rectangular data region by min/max + min_row = ws.min_row + max_row = ws.max_row + min_col = ws.min_column + max_col = ws.max_column + lines: list[str] = [] + for r in range(min_row, max_row + 1): + cells: list[str] = [] + for c in range(min_col, max_col + 1): + cell = ws.cell(row=r, column=c) + v = cell.value + if v is None: + cells.append("") + elif isinstance(v, (int, float)): + cells.append(str(v)) + elif isinstance(v, datetime): + cells.append(v.strftime("%Y-%m-%d %H:%M:%S")) + else: + cells.append(f'"{str(v).replace("\"", "\"\"")}"') + lines.append(",".join(cells)) + csvData = "\n".join(lines) + parts.append(ContentPart( + id=makeId(), + parentId=rootId, + label=f"sheet_{sheetName}", + typeGroup="table", + mimeType="text/csv", + data=csvData, + metadata={"sheet": sheetName, "size": len(csvData.encode('utf-8'))} + )) + + return parts + + diff --git a/modules/services/serviceExtraction/formats/xml_extractor.py b/modules/services/serviceExtraction/formats/xml_extractor.py new file mode 100644 index 00000000..7067924b --- /dev/null +++ b/modules/services/serviceExtraction/formats/xml_extractor.py @@ -0,0 +1,30 @@ +from typing import Any, Dict, List +import xml.etree.ElementTree as ET + +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId +from ..subRegistry import Extractor + + +class XmlExtractor(Extractor): + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return mimeType == "application/xml" or (fileName or "").lower().endswith((".xml", ".rss", ".atom")) + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> List[ContentPart]: + mimeType = context.get("mimeType") or "application/xml" + text = fileBytes.decode("utf-8", errors="replace") + try: + ET.fromstring(text) + except Exception: + pass + return [ContentPart( + id=makeId(), + parentId=None, + label="main", + typeGroup="structure", + mimeType=mimeType, + data=text, + metadata={"size": len(fileBytes)} + )] + + diff --git a/modules/services/serviceExtraction/mainServiceExtraction.py b/modules/services/serviceExtraction/mainServiceExtraction.py new file mode 100644 index 00000000..1f6f36e5 --- /dev/null +++ b/modules/services/serviceExtraction/mainServiceExtraction.py @@ -0,0 +1,57 @@ +from typing import Any, Dict, List, Optional +import uuid + +from .subRegistry import ExtractorRegistry, ChunkerRegistry +from .subPipeline import runExtraction, poolAndLimit, applyAiIfRequested +from modules.datamodels.datamodelExtraction import ExtractedContent, ContentPart + + +class ExtractionService: + def __init__(self): + self._extractorRegistry = ExtractorRegistry() + self._chunkerRegistry = ChunkerRegistry() + + def extractDocuments(self, documentList: List[Dict[str, Any]], options: Dict[str, Any]) -> Any: + processIndividually = options.get("processDocumentsIndividually", True) + + if processIndividually: + results: List[ExtractedContent] = [] + for doc in documentList: + ec = runExtraction( + extractorRegistry=self._extractorRegistry, + chunkerRegistry=self._chunkerRegistry, + documentBytes=doc.get("bytes"), + fileName=doc.get("fileName"), + mimeType=doc.get("mimeType"), + options=options + ) + ec = applyAiIfRequested(ec, options) + results.append(ec) + return results + else: + allParts: List[ContentPart] = [] + for doc in documentList: + ec = runExtraction( + extractorRegistry=self._extractorRegistry, + chunkerRegistry=self._chunkerRegistry, + documentBytes=doc.get("bytes"), + fileName=doc.get("fileName"), + mimeType=doc.get("mimeType"), + options=options + ) + for p in ec.parts: + if "documentId" not in p.metadata: + p.metadata["documentId"] = doc.get("id") or str(uuid.uuid4()) + allParts.extend(ec.parts) + + pooled = poolAndLimit(allParts, self._chunkerRegistry, options) + # In pooled mode we return a dict containing pooled parts and an optional AI output + pooledResult: Dict[str, Any] = { + "parts": pooled, + "summary": {"documents": len(documentList)} + } + aiOut = applyAiIfRequested(ExtractedContent(id=str(uuid.uuid4()), parts=pooled, summary=None), options) + pooledResult["ai"] = aiOut.summary if isinstance(aiOut, ExtractedContent) else aiOut + return pooledResult + + diff --git a/modules/services/serviceExtraction/merging/__init__.py b/modules/services/serviceExtraction/merging/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/services/serviceExtraction/merging/default_merger.py b/modules/services/serviceExtraction/merging/default_merger.py new file mode 100644 index 00000000..ceab6635 --- /dev/null +++ b/modules/services/serviceExtraction/merging/default_merger.py @@ -0,0 +1,11 @@ +from typing import Any, Dict, List +from modules.datamodels.datamodelExtraction import ContentPart + + +class DefaultMerger: + def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + """ + Default merger that passes through parts unchanged. + Used for image, binary, metadata, container typeGroups. + """ + return parts diff --git a/modules/services/serviceExtraction/merging/table_merger.py b/modules/services/serviceExtraction/merging/table_merger.py new file mode 100644 index 00000000..04be404e --- /dev/null +++ b/modules/services/serviceExtraction/merging/table_merger.py @@ -0,0 +1,152 @@ +from typing import Any, Dict, List +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId + + +class TableMerger: + def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + """ + Merge table parts based on strategy. + Strategy options: + - groupBy: "parentId" (default), "documentId", "sheet", "none" + - maxSize: maximum size per merged part + - combineSheets: bool - whether to combine multiple sheets into one table + """ + if not parts: + return parts + + groupBy = strategy.get("groupBy", "parentId") + maxSize = strategy.get("maxSize", 0) + combineSheets = strategy.get("combineSheets", False) + + # Group parts + groups = self._groupParts(parts, groupBy, combineSheets) + + merged: List[ContentPart] = [] + for groupKey, groupParts in groups.items(): + if maxSize > 0: + merged.extend(self._mergeWithSizeLimit(groupParts, maxSize, groupKey)) + else: + merged.extend(self._mergeGroup(groupParts, groupKey)) + + return merged + + def _groupParts(self, parts: List[ContentPart], groupBy: str, combineSheets: bool) -> Dict[str, List[ContentPart]]: + groups: Dict[str, List[ContentPart]] = {} + + for part in parts: + if part.typeGroup != "table": + # Non-table parts go in their own group + key = f"nontable_{part.id}" + if key not in groups: + groups[key] = [] + groups[key].append(part) + continue + + if groupBy == "parentId": + key = part.parentId or "root" + elif groupBy == "documentId": + key = part.metadata.get("documentId", "unknown") + elif groupBy == "sheet" and not combineSheets: + key = part.metadata.get("sheet", "unknown") + else: # "none" or combineSheets=True + key = "all_tables" + + if key not in groups: + groups[key] = [] + groups[key].append(part) + + return groups + + def _mergeGroup(self, parts: List[ContentPart], groupKey: str) -> List[ContentPart]: + if not parts: + return [] + if len(parts) == 1: + return parts + + # For tables, we typically keep them separate unless explicitly combining + # But we can add metadata about the group + for i, part in enumerate(parts): + part.metadata["groupKey"] = groupKey + part.metadata["groupIndex"] = i + part.metadata["groupSize"] = len(parts) + + return parts + + def _mergeWithSizeLimit(self, parts: List[ContentPart], maxSize: int, groupKey: str) -> List[ContentPart]: + if not parts: + return [] + + # For tables, we typically don't merge across different tables + # Instead, we chunk individual large tables + merged: List[ContentPart] = [] + + for part in parts: + partSize = part.metadata.get("size", 0) + + if partSize <= maxSize: + # Part fits within limit + part.metadata["groupKey"] = groupKey + merged.append(part) + else: + # Chunk the large table + chunks = self._chunkTable(part, maxSize) + merged.extend(chunks) + + return merged + + def _chunkTable(self, part: ContentPart, maxSize: int) -> List[ContentPart]: + """Chunk a large table by rows while preserving CSV structure.""" + lines = part.data.split('\n') + if not lines: + return [part] + + chunks: List[ContentPart] = [] + currentChunk: List[str] = [] + currentSize = 0 + + for line in lines: + lineSize = len(line.encode('utf-8')) + 1 # +1 for newline + + if currentSize + lineSize > maxSize and currentChunk: + # Flush current chunk + chunkData = '\n'.join(currentChunk) + chunks.append(ContentPart( + id=makeId(), + parentId=part.parentId, + label=f"{part.label}_chunk_{len(chunks)}", + typeGroup="table", + mimeType=part.mimeType, + data=chunkData, + metadata={ + "size": len(chunkData.encode('utf-8')), + "chunk": True, + "originalPart": part.id, + "chunkIndex": len(chunks) + } + )) + currentChunk = [line] + currentSize = lineSize + else: + currentChunk.append(line) + currentSize += lineSize + + # Flush remaining chunk + if currentChunk: + chunkData = '\n'.join(currentChunk) + chunks.append(ContentPart( + id=makeId(), + parentId=part.parentId, + label=f"{part.label}_chunk_{len(chunks)}", + typeGroup="table", + mimeType=part.mimeType, + data=chunkData, + metadata={ + "size": len(chunkData.encode('utf-8')), + "chunk": True, + "originalPart": part.id, + "chunkIndex": len(chunks) + } + )) + + return chunks diff --git a/modules/services/serviceExtraction/merging/text_merger.py b/modules/services/serviceExtraction/merging/text_merger.py new file mode 100644 index 00000000..bb9e850d --- /dev/null +++ b/modules/services/serviceExtraction/merging/text_merger.py @@ -0,0 +1,136 @@ +from typing import Any, Dict, List +from modules.datamodels.datamodelExtraction import ContentPart +from ..utils import makeId + + +class TextMerger: + def merge(self, parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + """ + Merge text parts based on strategy. + Strategy options: + - groupBy: "parentId" (default), "documentId", "none" + - orderBy: "label", "pageIndex", "sheetIndex", "none" + - maxSize: maximum size per merged part + """ + if not parts: + return parts + + groupBy = strategy.get("groupBy", "parentId") + orderBy = strategy.get("orderBy", "label") + maxSize = strategy.get("maxSize", 0) + + # Group parts + groups = self._groupParts(parts, groupBy) + + merged: List[ContentPart] = [] + for groupKey, groupParts in groups.items(): + # Sort within group + sortedParts = self._sortParts(groupParts, orderBy) + + # Merge respecting maxSize + if maxSize > 0: + merged.extend(self._mergeWithSizeLimit(sortedParts, maxSize)) + else: + merged.extend(self._mergeGroup(sortedParts, groupKey)) + + return merged + + def _groupParts(self, parts: List[ContentPart], groupBy: str) -> Dict[str, List[ContentPart]]: + groups: Dict[str, List[ContentPart]] = {} + + for part in parts: + if part.typeGroup != "text": + # Non-text parts go in their own group + key = f"nontext_{part.id}" + if key not in groups: + groups[key] = [] + groups[key].append(part) + continue + + if groupBy == "parentId": + key = part.parentId or "root" + elif groupBy == "documentId": + key = part.metadata.get("documentId", "unknown") + else: # "none" + key = "all" + + if key not in groups: + groups[key] = [] + groups[key].append(part) + + return groups + + def _sortParts(self, parts: List[ContentPart], orderBy: str) -> List[ContentPart]: + if orderBy == "pageIndex": + return sorted(parts, key=lambda p: p.metadata.get("pageIndex", 0)) + elif orderBy == "sheetIndex": + return sorted(parts, key=lambda p: p.metadata.get("sheetIndex", 0)) + elif orderBy == "label": + return sorted(parts, key=lambda p: p.label) + else: # "none" + return parts + + def _mergeGroup(self, parts: List[ContentPart], groupKey: str) -> List[ContentPart]: + if not parts: + return [] + if len(parts) == 1: + return parts + + # Merge all text parts in group + textParts = [p for p in parts if p.typeGroup == "text"] + nonTextParts = [p for p in parts if p.typeGroup != "text"] + + if not textParts: + return nonTextParts + + # Combine text data + combinedData = "\n".join([p.data for p in textParts]) + totalSize = sum(p.metadata.get("size", 0) for p in textParts) + + mergedPart = ContentPart( + id=makeId(), + parentId=textParts[0].parentId, + label=f"merged_{groupKey}", + typeGroup="text", + mimeType="text/plain", + data=combinedData, + metadata={ + "size": totalSize, + "merged": len(textParts), + "originalParts": [p.id for p in textParts] + } + ) + + return [mergedPart] + nonTextParts + + def _mergeWithSizeLimit(self, parts: List[ContentPart], maxSize: int) -> List[ContentPart]: + if not parts: + return [] + + textParts = [p for p in parts if p.typeGroup == "text"] + nonTextParts = [p for p in parts if p.typeGroup != "text"] + + if not textParts: + return nonTextParts + + merged: List[ContentPart] = [] + currentGroup: List[ContentPart] = [] + currentSize = 0 + + for part in textParts: + partSize = part.metadata.get("size", 0) + + if currentSize + partSize > maxSize and currentGroup: + # Flush current group + merged.extend(self._mergeGroup(currentGroup, f"chunk_{len(merged)}")) + currentGroup = [part] + currentSize = partSize + else: + currentGroup.append(part) + currentSize += partSize + + # Flush remaining group + if currentGroup: + merged.extend(self._mergeGroup(currentGroup, f"chunk_{len(merged)}")) + + return merged + nonTextParts diff --git a/modules/services/serviceExtraction/subPipeline.py b/modules/services/serviceExtraction/subPipeline.py new file mode 100644 index 00000000..4441e9c4 --- /dev/null +++ b/modules/services/serviceExtraction/subPipeline.py @@ -0,0 +1,186 @@ +from typing import Any, Dict, List + +from modules.datamodels.datamodelExtraction import ExtractedContent, ContentPart +from .utils import makeId +from .subRegistry import ExtractorRegistry, ChunkerRegistry +from .merging.text_merger import TextMerger +from .merging.table_merger import TableMerger +from .merging.default_merger import DefaultMerger + + +def runExtraction(extractorRegistry: ExtractorRegistry, chunkerRegistry: ChunkerRegistry, documentBytes: bytes, fileName: str, mimeType: str, options: Dict[str, Any]) -> ExtractedContent: + extractor = extractorRegistry.resolve(mimeType, fileName) + if extractor is None: + # fallback: single binary part + part = ContentPart( + id=makeId(), + parentId=None, + label="file", + typeGroup="binary", + mimeType=mimeType or "application/octet-stream", + data="", + metadata={"warning": "No extractor registered"} + ) + return ExtractedContent(id=makeId(), parts=[part]) + + parts = extractor.extract(documentBytes, {"fileName": fileName, "mimeType": mimeType, "options": options}) + # Optional merge step + mergeStrategy = options.get("mergeStrategy", {}) + if mergeStrategy: + parts = _mergeParts(parts, mergeStrategy) + return ExtractedContent(id=makeId(), parts=parts) + + +def poolAndLimit(parts: List[ContentPart], chunkerRegistry: ChunkerRegistry, options: Dict[str, Any]) -> List[ContentPart]: + maxSize = int(options.get("maxSize", 0) or 0) + chunkAllowed = bool(options.get("chunkAllowed", False)) + mergeStrategy = options.get("mergeStrategy", {}) + + if maxSize <= 0: + # Still apply merging if strategy provided + if mergeStrategy: + return _applyMerging(parts, mergeStrategy) + return parts + + # First, try to fit within size limit + current = 0 + kept: List[ContentPart] = [] + remaining: List[ContentPart] = [] + + for p in parts: + size = int(p.metadata.get("size", 0) or 0) + if current + size <= maxSize: + kept.append(p) + current += size + else: + remaining.append(p) + + # If we have remaining parts and chunking is allowed, try chunking + if remaining and chunkAllowed: + for p in remaining: + if p.typeGroup in ("text", "table", "structure"): + chunks = chunkerRegistry.resolve(p.typeGroup).chunk(p, options) + for ch in chunks: + chSize = int(ch.get("size", 0) or 0) + if current + chSize <= maxSize: + kept.append(ContentPart( + id=makeId(), + parentId=p.id, + label=f"chunk_{ch.get('order', 0)}", + typeGroup=p.typeGroup, + mimeType=p.mimeType, + data=ch.get("data", ""), + metadata={"size": chSize, "chunk": True} + )) + current += chSize + else: + break + + # Apply merging strategy if provided + if mergeStrategy: + kept = _applyMerging(kept, mergeStrategy) + + # Re-check size after merging + totalSize = sum(int(p.metadata.get("size", 0) or 0) for p in kept) + if totalSize > maxSize and mergeStrategy.get("maxSize"): + # Apply size limit to merged parts + kept = _applySizeLimit(kept, maxSize) + + return kept + + +def _applyMerging(parts: List[ContentPart], strategy: Dict[str, Any]) -> List[ContentPart]: + """Apply merging strategy to parts.""" + textMerger = TextMerger() + tableMerger = TableMerger() + defaultMerger = DefaultMerger() + + # Group by typeGroup + textParts = [p for p in parts if p.typeGroup == "text"] + tableParts = [p for p in parts if p.typeGroup == "table"] + structureParts = [p for p in parts if p.typeGroup == "structure"] + otherParts = [p for p in parts if p.typeGroup not in ("text", "table", "structure")] + + merged: List[ContentPart] = [] + + if textParts: + merged.extend(textMerger.merge(textParts, strategy)) + if tableParts: + merged.extend(tableMerger.merge(tableParts, strategy)) + if structureParts: + # For now, treat structure like text + merged.extend(textMerger.merge(structureParts, strategy)) + if otherParts: + merged.extend(defaultMerger.merge(otherParts, strategy)) + + return merged + + +def _applySizeLimit(parts: List[ContentPart], maxSize: int) -> List[ContentPart]: + """Apply size limit by prioritizing parts and truncating if necessary.""" + # Sort by priority: text first, then others + priority_order = {"text": 0, "table": 1, "structure": 2, "image": 3, "binary": 4, "metadata": 5, "container": 6} + sorted_parts = sorted(parts, key=lambda p: priority_order.get(p.typeGroup, 99)) + + kept: List[ContentPart] = [] + current_size = 0 + + for part in sorted_parts: + part_size = int(part.metadata.get("size", 0) or 0) + if current_size + part_size <= maxSize: + kept.append(part) + current_size += part_size + else: + # Try to truncate text parts + if part.typeGroup == "text" and part_size > 0: + remaining_size = maxSize - current_size + if remaining_size > 1000: # Only truncate if we have meaningful space + truncated_data = part.data[:remaining_size * 4] # Rough character estimate + truncated_part = ContentPart( + id=makeId(), + parentId=part.parentId, + label=f"{part.label}_truncated", + typeGroup=part.typeGroup, + mimeType=part.mimeType, + data=truncated_data, + metadata={**part.metadata, "size": len(truncated_data.encode('utf-8')), "truncated": True} + ) + kept.append(truncated_part) + break + + return kept + + +def applyAiIfRequested(extracted: ExtractedContent, options: Dict[str, Any]) -> ExtractedContent: + """ + Apply AI processing if requested in options. + This is a placeholder for actual AI integration. + """ + prompt = options.get("prompt") + operationType = options.get("operationType", "general") + + if not prompt: + return extracted + + # Placeholder AI processing based on operationType + if operationType == "analyse_content": + # Add analysis metadata to parts + for part in extracted.parts: + if part.typeGroup in ("text", "table", "structure"): + part.metadata["ai_processed"] = True + part.metadata["operation_type"] = operationType + elif operationType == "generate_plan": + # Add plan generation metadata + for part in extracted.parts: + if part.typeGroup == "text": + part.metadata["ai_processed"] = True + part.metadata["operation_type"] = operationType + elif operationType == "generate_content": + # Add content generation metadata + for part in extracted.parts: + part.metadata["ai_processed"] = True + part.metadata["operation_type"] = operationType + + return extracted + + diff --git a/modules/services/serviceExtraction/subRegistry.py b/modules/services/serviceExtraction/subRegistry.py new file mode 100644 index 00000000..ceb9b1b5 --- /dev/null +++ b/modules/services/serviceExtraction/subRegistry.py @@ -0,0 +1,103 @@ +from typing import Any, Dict, Optional + +from .types import ContentPart + + +class Extractor: + def detect(self, fileName: str, mimeType: str, headBytes: bytes) -> bool: + return False + + def extract(self, fileBytes: bytes, context: Dict[str, Any]) -> list[ContentPart]: + raise NotImplementedError + + +class Chunker: + def chunk(self, part: ContentPart, options: Dict[str, Any]) -> list[Dict[str, Any]]: + return [] + + +class ExtractorRegistry: + def __init__(self): + self._map: Dict[str, Extractor] = {} + self._fallback: Optional[Extractor] = None + # Register built-ins + try: + from .formats.text_extractor import TextExtractor + from .formats.csv_extractor import CsvExtractor + from .formats.json_extractor import JsonExtractor + from .formats.xml_extractor import XmlExtractor + from .formats.html_extractor import HtmlExtractor + from .formats.pdf_extractor import PdfExtractor + from .formats.docx_extractor import DocxExtractor + from .formats.xlsx_extractor import XlsxExtractor + from .formats.image_extractor import ImageExtractor + from .formats.binary_extractor import BinaryExtractor + self.register("text/plain", TextExtractor()) + self.register("text/markdown", TextExtractor()) + self.register("text/csv", CsvExtractor()) + self.register("application/json", JsonExtractor()) + self.register("application/xml", XmlExtractor()) + self.register("text/html", HtmlExtractor()) + self.register("application/pdf", PdfExtractor()) + self.register("application/vnd.openxmlformats-officedocument.wordprocessingml.document", DocxExtractor()) + self.register("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", XlsxExtractor()) + # images + self.register("image/jpeg", ImageExtractor()) + self.register("image/png", ImageExtractor()) + self.register("image/gif", ImageExtractor()) + # extension fallbacks + self.register("txt", TextExtractor()) + self.register("md", TextExtractor()) + self.register("csv", CsvExtractor()) + self.register("json", JsonExtractor()) + self.register("xml", XmlExtractor()) + self.register("html", HtmlExtractor()) + self.register("htm", HtmlExtractor()) + self.register("pdf", PdfExtractor()) + self.register("docx", DocxExtractor()) + self.register("xlsx", XlsxExtractor()) + self.register("xlsm", XlsxExtractor()) + # fallback + self.setFallback(BinaryExtractor()) + except Exception: + pass + + def register(self, key: str, extractor: Extractor): + self._map[key] = extractor + + def setFallback(self, extractor: Extractor): + self._fallback = extractor + + def resolve(self, mimeType: str, fileName: str) -> Optional[Extractor]: + if mimeType in self._map: + return self._map[mimeType] + # simple extension fallback + if "." in fileName: + ext = fileName.lower().rsplit(".", 1)[-1] + if ext in self._map: + return self._map[ext] + return self._fallback + + +class ChunkerRegistry: + def __init__(self): + self._map: Dict[str, Chunker] = {} + self._noop = Chunker() + # Register default chunkers + try: + from .chunking.text_chunker import TextChunker + from .chunking.table_chunker import TableChunker + from .chunking.structure_chunker import StructureChunker + self.register("text", TextChunker()) + self.register("table", TableChunker()) + self.register("structure", StructureChunker()) + except Exception: + pass + + def register(self, typeGroup: str, chunker: Chunker): + self._map[typeGroup] = chunker + + def resolve(self, typeGroup: str) -> Chunker: + return self._map.get(typeGroup, self._noop) + + diff --git a/modules/services/serviceExtraction/utils/__init__.py b/modules/services/serviceExtraction/utils/__init__.py new file mode 100644 index 00000000..a16d3f59 --- /dev/null +++ b/modules/services/serviceExtraction/utils/__init__.py @@ -0,0 +1,7 @@ +import uuid + + +def makeId() -> str: + return str(uuid.uuid4()) + + diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index a3111b54..033a89b6 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -9,6 +9,7 @@ from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelWorkflow import ActionResult +from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority logger = logging.getLogger(__name__) @@ -32,7 +33,7 @@ class MethodAi(MethodBase): Parameters: aiPrompt (str): The AI prompt for processing documentList (list, optional): List of document references to include in context - expectedDocumentFormats (list, optional): Expected output formats with extension, mimeType, description + expectedDocumentFormat (str, optional): Expected document output format with extension, mimeType, description processingMode (str, optional): Processing mode - use 'basic', 'advanced', or 'detailed' (defaults to 'basic') includeMetadata (bool, optional): Whether to include metadata (default: True) operationType (str, optional): Operation type - use 'general', 'generate_plan', 'analyse_content', 'generate_content', 'web_research', 'image_analysis', or 'image_generation' @@ -46,7 +47,7 @@ class MethodAi(MethodBase): documentList = parameters.get("documentList", []) if isinstance(documentList, str): documentList = [documentList] - expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) + expectedDocumentFormat = parameters.get("expectedDocumentFormat", "") processingMode = parameters.get("processingMode", "basic") includeMetadata = parameters.get("includeMetadata", True) operationType = parameters.get("operationType", "general") @@ -64,8 +65,7 @@ class MethodAi(MethodBase): output_extension = ".txt" # Default output_mime_type = "text/plain" # Default - if expectedDocumentFormats and len(expectedDocumentFormats) > 0: - expected_format = expectedDocumentFormats[0] + if expectedDocumentFormat: output_extension = expected_format.get("extension", ".txt") output_mime_type = expected_format.get("mimeType", "text/plain") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") @@ -205,26 +205,19 @@ class MethodAi(MethodBase): output_format = output_extension.replace('.', '') or 'txt' - # Build options from parameters - options = { - "process_type": "text", - "operation_type": operationType, - "priority": priority, - "compress_prompt": processingMode != "detailed", - "compress_documents": True, - "process_documents_individually": True, - "processing_mode": processingMode, - "result_format_requested": output_format, - "include_metadata": includeMetadata - } - - # Add optional parameters if provided - if maxCost is not None: - options["max_cost"] = maxCost - if maxProcessingTime is not None: - options["max_processing_time"] = maxProcessingTime - if requiredTags is not None: - options["required_tags"] = requiredTags + # Build options using new AiCallOptions format + options = AiCallOptions( + operationType=operationType, + priority=priority, + compressPrompt=processingMode != "detailed", + compressContext=True, + processDocumentsIndividually=True, + processingMode=processingMode, + resultFormat=output_format, + maxCost=maxCost, + maxProcessingTime=maxProcessingTime, + requiredTags=requiredTags + ) result = await self.services.ai.callAi( prompt=call_prompt, @@ -260,19 +253,17 @@ class MethodAi(MethodBase): result = await self.services.ai.callAi( prompt=guardrail_prompt, documents=context or None, - options={ - "process_type": "text", - "operation_type": "generate_content", - "priority": "quality", - "compress_prompt": False, - "compress_documents": True, - "process_documents_individually": True, - "processing_mode": "detailed", - "result_format_requested": "json", - "include_metadata": False, - "max_cost": 0.03, - "max_processing_time": 30 - } + options=AiCallOptions( + operationType=OperationType.GENERATE_CONTENT, + priority=Priority.QUALITY, + compressPrompt=False, + compressContext=True, + processDocumentsIndividually=True, + processingMode="detailed", + resultFormat="json", + maxCost=0.03, + maxProcessingTime=30 + ) ) except Exception: result = cleaned # fallback to first attempt diff --git a/modules/workflows/methods/methodDocument.py b/modules/workflows/methods/methodDocument.py index 7f4239e6..61b4741b 100644 --- a/modules/workflows/methods/methodDocument.py +++ b/modules/workflows/methods/methodDocument.py @@ -10,6 +10,7 @@ from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelWorkflow import ActionResult, ChatDocument +from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority logger = logging.getLogger(__name__) @@ -532,14 +533,13 @@ class MethodDocument(MethodBase): formatted_content = await self.services.ai.callAi( prompt=ai_prompt, documents=None, - options={ - "process_type": "text", - "operation_type": "generate_content", - "priority": "speed", - "compress_prompt": True, - "compress_documents": False, - "max_cost": 0.02 - } + options=AiCallOptions( + operationType=OperationType.GENERATE_CONTENT, + priority=Priority.SPEED, + compressPrompt=True, + compressContext=False, + maxCost=0.02 + ) ) if not formatted_content or formatted_content.strip() == "": @@ -776,19 +776,17 @@ SOURCE DOCUMENT CONTENT: aiReport = await self.services.ai.callAi( prompt=aiPrompt, documents=documents or None, - options={ - "process_type": "text", - "operation_type": "report_generation", - "priority": "quality", - "compress_prompt": False, - "compress_documents": True, - "process_documents_individually": True, - "result_format_requested": "html", - "include_metadata": includeMetadata, - "processing_mode": "detailed", - "max_cost": 0.08, - "max_processing_time": 90 - } + options=AiCallOptions( + operationType=OperationType.GENERATE_CONTENT, # Using GENERATE_CONTENT for report generation + priority=Priority.QUALITY, + compressPrompt=False, + compressContext=True, + processDocumentsIndividually=True, + resultFormat="html", + processingMode="detailed", + maxCost=0.08, + maxProcessingTime=90 + ) ) # If AI call fails, return error - AI is crucial for report generation diff --git a/modules/workflows/methods/methodOutlook.py b/modules/workflows/methods/methodOutlook.py index c44bf405..02fca00b 100644 --- a/modules/workflows/methods/methodOutlook.py +++ b/modules/workflows/methods/methodOutlook.py @@ -12,7 +12,9 @@ import uuid import requests from modules.workflows.methods.methodBase import MethodBase, action -from modules.datamodels.datamodelWorkflow import ActionResult, ChatDocument +from modules.datamodels.datamodelWorkflow import ActionResult +from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority +from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelUam import ConnectionStatus logger = logging.getLogger(__name__) @@ -1519,17 +1521,15 @@ class MethodOutlook(MethodBase): composed_email = await self.services.ai.callAi( prompt=ai_prompt, documents=documents or None, - options={ - "process_type": "text", - "operation_type": "email_composition", - "priority": "speed", - "compress_prompt": True, - "compress_documents": True, - "process_documents_individually": False, - "include_metadata": True, - "max_cost": 0.02, - "max_processing_time": 15 - } + options=AiCallOptions( + operationType=OperationType.GENERATE_CONTENT, # Using GENERATE_CONTENT for email composition + priority=Priority.SPEED, + compressPrompt=True, + compressContext=True, + processDocumentsIndividually=False, + maxCost=0.02, + maxProcessingTime=15 + ) ) # Parse the AI response to ensure it's valid JSON diff --git a/modules/workflows/methods/methodWeb.py b/modules/workflows/methods/methodWeb.py index cd8a4afd..69ccee06 100644 --- a/modules/workflows/methods/methodWeb.py +++ b/modules/workflows/methods/methodWeb.py @@ -5,6 +5,7 @@ import json as _json from typing import Any, Dict from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelWorkflow import ActionResult, ActionDocument +from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority from modules.datamodels.datamodelWeb import ( WebSearchRequest, WebCrawlRequest, @@ -278,16 +279,15 @@ class MethodWeb(MethodBase): summary = await self.services.ai.callAi( prompt=prompt, documents=None, - options={ - "process_type": "text", - "operation_type": "analyse_content", - "priority": "balanced", - "compress_prompt": True, - "compress_documents": False, - "processing_mode": "advanced", - "max_cost": 0.05, - "max_processing_time": 30 - } + options=AiCallOptions( + operationType=OperationType.ANALYSE_CONTENT, + priority=Priority.BALANCED, + compressPrompt=True, + compressContext=False, + processingMode="advanced", + maxCost=0.05, + maxProcessingTime=30 + ) ) summary = summary.strip() except Exception: diff --git a/modules/workflows/processing/handlingTasks.py b/modules/workflows/processing/handlingTasks.py index f1fb7b94..fbd6163e 100644 --- a/modules/workflows/processing/handlingTasks.py +++ b/modules/workflows/processing/handlingTasks.py @@ -33,6 +33,20 @@ from modules.workflows.processing.promptFactory import ( createActionParameterPrompt, createRefinementPrompt ) +from modules.workflows.processing.promptFactoryPlaceholders import ( + createTaskPlanningPromptTemplate, + createActionDefinitionPromptTemplate, + createActionSelectionPromptTemplate, + createActionParameterPromptTemplate, + createRefinementPromptTemplate, + createResultReviewPromptTemplate, + extractUserPrompt, + extractAvailableDocuments, + extractWorkflowHistory, + extractAvailableMethods, + extractUserLanguage, + extractReviewContent +) from modules.services.serviceDocument.mainServiceDocumentGeneration import DocumentGenerationService from modules.workflows.processing.promptFactory import methods from modules.workflows.processing.executionState import should_continue @@ -117,16 +131,28 @@ class HandlingTasks: } ) - # Generate the task planning prompt - task_planning_prompt = createTaskPlanningPrompt(task_planning_context, self.service) + # Generate the task planning prompt with placeholders + task_planning_prompt_template = createTaskPlanningPromptTemplate() + + # Extract content for placeholders + user_prompt = extractUserPrompt(task_planning_context) + available_documents = extractAvailableDocuments(task_planning_context) + workflow_history = extractWorkflowHistory(self.service, task_planning_context) + + # Create placeholders dictionary + placeholders = { + "USER_PROMPT": user_prompt, + "AVAILABLE_DOCUMENTS": available_documents, + "WORKFLOW_HISTORY": workflow_history + } # Log task planning prompt sent to AI logger.info("=== TASK PLANNING PROMPT SENT TO AI ===") # Trace task planning prompt - self.writeTraceLog("Task Plan Prompt", task_planning_prompt) - - # Centralized AI call: Task planning (quality, detailed) + self.writeTraceLog("Task Plan Prompt", task_planning_prompt_template) + self.writeTraceLog("Task Plan Placeholders", placeholders) + # Centralized AI call: Task planning (quality, detailed) with placeholders options = AiCallOptions( operationType=OperationType.GENERATE_PLAN, priority=Priority.QUALITY, @@ -137,9 +163,9 @@ class HandlingTasks: maxProcessingTime=30 ) - prompt = await self.services.ai.callAiText( - prompt=task_planning_prompt, - documents=None, + prompt = await self.services.ai.callAi( + prompt=task_planning_prompt_template, + placeholders=placeholders, options=options ) @@ -382,12 +408,30 @@ class HandlingTasks: # Check workflow status before calling AI service self._checkWorkflowStopped() - # Generate the action definition prompt - action_prompt = await createActionDefinitionPrompt(action_context, self.service) - # Trace action planning prompt - self.writeTraceLog("Action Plan Prompt", action_prompt) + # Generate the action definition prompt with placeholders + action_prompt_template = createActionDefinitionPromptTemplate() - # Centralized AI call: Action planning (quality, detailed) + # Extract content for placeholders + user_prompt = extractUserPrompt(action_context) + available_documents = extractAvailableDocuments(action_context) + workflow_history = extractWorkflowHistory(self.service, action_context) + available_methods = extractAvailableMethods(self.service) + user_language = extractUserLanguage(self.service) + + # Create placeholders dictionary + placeholders = { + "USER_PROMPT": user_prompt, + "AVAILABLE_DOCUMENTS": available_documents, + "WORKFLOW_HISTORY": workflow_history, + "AVAILABLE_METHODS": available_methods, + "USER_LANGUAGE": user_language + } + + # Trace action planning prompt + self.writeTraceLog("Action Plan Prompt", action_prompt_template) + self.writeTraceLog("Action Plan Placeholders", placeholders) + + # Centralized AI call: Action planning (quality, detailed) with placeholders options = AiCallOptions( operationType=OperationType.GENERATE_PLAN, priority=Priority.QUALITY, @@ -398,9 +442,9 @@ class HandlingTasks: maxProcessingTime=30 ) - prompt = await self.services.ai.callAiText( - prompt=action_prompt, - documents=None, + prompt = await self.services.ai.callAi( + prompt=action_prompt_template, + placeholders=placeholders, options=options ) @@ -478,8 +522,25 @@ class HandlingTasks: async def plan_select(self, context: TaskContext) -> Dict[str, Any]: """Plan: select exactly one action. Returns {"action": {method, name}}""" - prompt = createActionSelectionPrompt(context, self.service) - self.writeTraceLog("React Plan Selection Prompt", prompt) + prompt_template = createActionSelectionPromptTemplate() + + # Extract content for placeholders + user_prompt = extractUserPrompt(context) + available_documents = extractAvailableDocuments(context) + user_language = extractUserLanguage(self.service) + available_methods = extractAvailableMethods(self.service) + + # Create placeholders dictionary + placeholders = { + "USER_PROMPT": user_prompt, + "AVAILABLE_DOCUMENTS": available_documents, + "USER_LANGUAGE": user_language, + "AVAILABLE_METHODS": available_methods + } + + self.writeTraceLog("React Plan Selection Prompt", prompt_template) + self.writeTraceLog("React Plan Selection Placeholders", placeholders) + # Centralized AI call for plan selection (use plan generation quality) options = AiCallOptions( operationType=OperationType.GENERATE_PLAN, @@ -491,9 +552,9 @@ class HandlingTasks: maxProcessingTime=30 ) - response = await self.services.ai.callAiText( - prompt=prompt, - documents=None, + response = await self.services.ai.callAi( + prompt=prompt_template, + placeholders=placeholders, options=options ) self.writeTraceLog("React Plan Selection Response", response) @@ -509,8 +570,35 @@ class HandlingTasks: async def act_execute(self, context: TaskContext, selection: Dict[str, Any], task_step: TaskStep, workflow, step_index: int) -> ActionResult: """Act: request minimal parameters then execute selected action.""" action = selection.get('action', {}) - params_prompt = createActionParameterPrompt(context, action, self.service) - self.writeTraceLog("React Parameters Prompt", params_prompt) + prompt_template = createActionParameterPromptTemplate() + + # Extract content for placeholders + user_prompt = extractUserPrompt(context) + available_documents = extractAvailableDocuments(context) + user_language = extractUserLanguage(self.service) + + # Get action signature + method = action.get('method', '') + name = action.get('name', '') + action_signature = "" + if self.service and method in methods: + method_instance = methods[method]['instance'] + action_signature = method_instance.getActionSignature(name) + + selected_action = f"{method}.{name}" + + # Create placeholders dictionary + placeholders = { + "USER_PROMPT": user_prompt, + "AVAILABLE_DOCUMENTS": available_documents, + "USER_LANGUAGE": user_language, + "SELECTED_ACTION": selected_action, + "ACTION_SIGNATURE": action_signature + } + + self.writeTraceLog("React Parameters Prompt", prompt_template) + self.writeTraceLog("React Parameters Placeholders", placeholders) + # Centralized AI call for parameter suggestion (balanced analysis) options = AiCallOptions( operationType=OperationType.ANALYSE_CONTENT, @@ -522,9 +610,9 @@ class HandlingTasks: maxProcessingTime=30 ) - params_resp = await self.services.ai.callAiText( - prompt=params_prompt, - documents=None, + params_resp = await self.services.ai.callAi( + prompt=prompt_template, + placeholders=placeholders, options=options ) self.writeTraceLog("React Parameters Response", params_resp) @@ -578,8 +666,21 @@ class HandlingTasks: async def refine_decide(self, context: TaskContext, observation: Dict[str, Any]) -> Dict[str, Any]: """Refine: decide continue or stop, with reason""" - prompt = createRefinementPrompt(context, observation) - self.writeTraceLog("React Refinement Prompt", prompt) + prompt_template = createRefinementPromptTemplate() + + # Extract content for placeholders + user_prompt = extractUserPrompt(context) + review_content = extractReviewContent(type('Context', (), {'observation': observation})()) + + # Create placeholders dictionary + placeholders = { + "USER_PROMPT": user_prompt, + "REVIEW_CONTENT": review_content + } + + self.writeTraceLog("React Refinement Prompt", prompt_template) + self.writeTraceLog("React Refinement Placeholders", placeholders) + # Centralized AI call for refinement decision (balanced analysis) options = AiCallOptions( operationType=OperationType.ANALYSE_CONTENT, @@ -591,9 +692,9 @@ class HandlingTasks: maxProcessingTime=30 ) - resp = await self.services.ai.callAiText( - prompt=prompt, - documents=None, + resp = await self.services.ai.callAi( + prompt=prompt_template, + placeholders=placeholders, options=options ) self.writeTraceLog("React Refinement Response", resp) @@ -1100,8 +1201,18 @@ class HandlingTasks: # Check workflow status before calling AI service self._checkWorkflowStopped() - # Use promptFactory for review prompt - prompt = createResultReviewPrompt(review_context, self.service) + # Use placeholder-based review prompt + prompt_template = createResultReviewPromptTemplate() + + # Extract content for placeholders + user_prompt = extractUserPrompt(review_context) + review_content = extractReviewContent(review_context) + + # Create placeholders dictionary + placeholders = { + "USER_PROMPT": user_prompt, + "REVIEW_CONTENT": review_content + } # Log result review prompt sent to AI logger.info("=== RESULT REVIEW PROMPT SENT TO AI ===") @@ -1109,9 +1220,10 @@ class HandlingTasks: logger.info(f"Action Results Count: {len(review_context.action_results) if review_context.action_results else 0}") logger.info(f"Task Actions Count: {len(review_context.task_actions) if review_context.task_actions else 0}") # Trace result review prompt - self.writeTraceLog("Result Review Prompt", prompt) + self.writeTraceLog("Result Review Prompt", prompt_template) + self.writeTraceLog("Result Review Placeholders", placeholders) - # Centralized AI call: Result validation (balanced analysis) + # Centralized AI call: Result validation (balanced analysis) with placeholders options = AiCallOptions( operationType=OperationType.ANALYSE_CONTENT, priority=Priority.BALANCED, @@ -1122,9 +1234,9 @@ class HandlingTasks: maxProcessingTime=30 ) - response = await self.services.ai.callAiText( - prompt=prompt, - documents=None, + response = await self.services.ai.callAi( + prompt=prompt_template, + placeholders=placeholders, options=options ) diff --git a/modules/workflows/processing/promptFactoryPlaceholders.py b/modules/workflows/processing/promptFactoryPlaceholders.py new file mode 100644 index 00000000..de2ef6a7 --- /dev/null +++ b/modules/workflows/processing/promptFactoryPlaceholders.py @@ -0,0 +1,418 @@ +""" +Placeholder-based prompt factory for dynamic AI calls. +This module provides prompt templates with placeholders that can be filled dynamically. +""" + +import json +from typing import Dict, Any +from modules.workflows.processing.promptFactory import ( + _getAvailableDocuments, + _getPreviousRoundContext, + getMethodsList, + getEnhancedDocumentContext, + _getConnectionReferenceList, + methods +) + + +def createTaskPlanningPromptTemplate() -> str: + """Create task planning prompt template with placeholders.""" + return """You are a task planning AI that analyzes user requests and creates structured, self-contained task plans with user-friendly feedback messages. + +USER REQUEST: {{KEY:USER_PROMPT}} + +AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}} + +PREVIOUS WORKFLOW ROUNDS CONTEXT: +{{KEY:WORKFLOW_HISTORY}} + +INSTRUCTIONS: +1. Analyze the user request, available documents, and previous workflow rounds context +2. If the user request appears to be a follow-up (like "try again", "versuche es nochmals", "retry", etc.), + use the PREVIOUS WORKFLOW ROUNDS CONTEXT to understand what the user wants to retry or continue +3. Group related topics and sequential steps into single, comprehensive tasks +4. Focus on business outcomes, not technical operations +5. Make each task self-contained: clearly state what to do and what outputs are expected +6. Ensure proper handover between tasks (later actions will use your task outputs) +7. Detect the language of the user request and include it in languageUserDetected +8. Generate user-friendly messages for each task in the user's request language +9. Return a JSON object with the exact structure shown below + +TASK GROUPING PRINCIPLES: +- COMBINE RELATED TOPICS: Group related subjects, sequential steps, or workflow-structured activities into single tasks +- SEQUENTIAL WORKFLOWS: If the user says "first do this, then that, then that" → create ONE task that handles the entire sequence +- SIMILAR CONTENT: If multiple items deal with the same subject matter → combine into ONE comprehensive task +- ONLY SPLIT WHEN DIFFERENT: Create separate tasks ONLY when the user explicitly wants different, independent things + +EXAMPLES OF GOOD TASK GROUPING: + +COMBINE INTO ONE TASK: +- "Analyze the documents, extract key insights, and create a summary report" → ONE task: "Analyze documents and create comprehensive summary report" +- "First check my emails, then respond to urgent ones, then organize my inbox" → ONE task: "Process and organize email inbox with priority responses" +- "Review the budget, analyze spending patterns, and suggest cost-cutting measures" → ONE task: "Comprehensive budget analysis with optimization recommendations" +- "Create a business strategy, develop marketing plan, and prepare presentation" → ONE task: "Develop complete business strategy with marketing plan and presentation" + +SPLIT INTO MULTIPLE TASKS: +- "Create a business strategy for Q4" AND "Check my emails for messages from my assistant" → TWO separate tasks (different subjects) +- "Analyze customer feedback" AND "Prepare quarterly financial report" → TWO separate tasks (different business areas) +- "Review project timeline" AND "Update employee handbook" → TWO separate tasks (unrelated activities) + +TASK PLANNING PRINCIPLES: +- Break down complex requests into logical, sequential steps +- Focus on business value and outcomes +- Keep tasks at a meaningful level of abstraction (not implementation details) +- Each task should produce results that can be used by subsequent tasks +- Ensure clear dependencies and handovers between tasks +- Provide clear, actionable user messages in the user's request language +- Group related activities to minimize task fragmentation +- Only create multiple tasks when dealing with truly different, independent objectives +- Make task objectives action-oriented and specific (include scope, data sources to consider, and output intent at high level) +- Write success_criteria as measurable acceptance criteria focusing on outputs (what artifacts or insights will exist and how they are validated) + +FOLLOW-UP PROMPT HANDLING: +- If the user request is a follow-up (e.g., "try again", "versuche es nochmals", "retry", "continue", "proceed"), + analyze the PREVIOUS WORKFLOW ROUNDS CONTEXT to understand what failed or was incomplete +- Use the previous round's user requests and task outcomes to determine what the user wants to retry +- If previous rounds failed due to missing documents, and documents are now available, + create tasks that use the newly available documents to accomplish the original request +- Maintain the same business objective from previous rounds but adapt to current available resources + +SPECIFIC SCENARIO HANDLING: +- If previous round failed with "documents missing" error and current round has documents available, + the user likely wants to retry the same operation with the newly provided documents +- Example: Previous round "speichere mir die 3 dokumente im sharepoint unter xxx" failed due to missing documents, + current round "versuche es nochmals" with documents should retry the SharePoint save operation +- Always check if the current request is a retry by looking for retry keywords and previous round context + +REQUIRED JSON STRUCTURE: +{{ + "overview": "Brief description of the overall plan", + "languageUserDetected": "en", // Language code detected from user request (en, de, fr, it, es, etc.) + "userMessage": "User-friendly message explaining the task plan in user's request language", + "tasks": [ + {{ + "id": "task_1", + "objective": "Clear business objective this task accomplishes (combining related activities)", + "dependencies": ["task_0"], // IDs of tasks that must complete first + "success_criteria": ["criteria1", "criteria2"], + "estimated_complexity": "low|medium|high", + "userMessage": "User-friendly message explaining what this task will accomplish in user's request language" + }} + ] +}} + +EXAMPLES OF GOOD TASK OBJECTIVES (COMBINING RELATED ACTIVITIES): +- "Analyze documents and extract key insights for business communication" +- "Create professional business communication incorporating analyzed information" +- "Execute business communication using specified channels and document outcomes" +- "Develop comprehensive business strategy with implementation roadmap and success metrics" + +EXAMPLES OF WELL-FORMED SUCCESS CRITERIA (OUTPUT-FOCUSED): +- "Deliver a prioritized list of 10–20 candidates with justification" +- "Provide a structured JSON with fields: company, ticker, rationale, metrics" +- "Produce a presentation outline with 5 sections and bullet points per section" +- "Include data sources and date stamped references for traceability" + +EXAMPLES OF GOOD SUCCESS CRITERIA: +- "Key insights extracted and ready for business use" +- "Professional communication created with clear business value" +- "Business communication successfully delivered and documented" +- "All outcomes properly documented and accessible" + +EXAMPLES OF BAD TASK OBJECTIVES: +- "Read the PDF file" (too granular - should be "Analyze document content") +- "Convert data to CSV" (implementation detail - should be "Structure data for analysis") +- "Send email" (too specific - should be "Deliver business communication") + +LANGUAGE DETECTION: +- Analyze the user request text to identify the language +- Use standard language codes: en (English), de (German), fr (French), it (Italian), es (Spanish), etc. +- If the language cannot be determined, use "en" as default +- Include the detected language in the languageUserDetected field + +NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" + + +def createActionDefinitionPromptTemplate() -> str: + """Create action definition prompt template with placeholders.""" + return """You are an action planning AI that generates specific, executable actions for task steps. + +TASK OBJECTIVE: {{KEY:USER_PROMPT}} + +AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}} + +WORKFLOW HISTORY: {{KEY:WORKFLOW_HISTORY}} + +AVAILABLE METHODS: {{KEY:AVAILABLE_METHODS}} + +USER LANGUAGE: {{KEY:USER_LANGUAGE}} + +INSTRUCTIONS: +- Generate actions to accomplish this task step using available documents, connections, and previous results +- Use docItem for single documents and docList for groups of documents as shown in AVAILABLE DOCUMENTS +- If there are no documents available, do not create document extraction actions. Select methods strictly based on the task objective; choose web actions when external information is required. Otherwise, generate a status/information report requesting needed inputs. +- Always pass documentList as a LIST of references (docItem and/or docList) - this list CANNOT be empty for document extraction actions +- For referencing documents from previous actions, use the format "round{current_round}_task{current_task}_action{action_number}_{descriptive_label}" +- Each action must be self-contained and executable with the provided parameters +- For document extraction, ensure prompts are specific and detailed +- Include validation steps in extraction prompts where relevant +- If this is a retry, learn from previous failures and improve the approach +- Address specific issues mentioned in previous review feedback +- When specifying expectedDocumentFormats, ensure AI prompts explicitly request pure data without markdown formatting +- Generate user-friendly messages for each action in the user's language + +PARAMETER COMPLETENESS REQUIREMENTS: +- Every parameter must contain all information needed to execute without implicit context +- Use explicit, concrete values (units, languages, formats, limits, date ranges, IDs) when applicable +- For search-like parameters (if any method requires a query), derive the query from the task objective AND ALL success criteria dimensions. Include: + - Key entities and domain terms from the objective + - All distinct facets from success_criteria (e.g., valuation AND AI potential AND know-how needs) + - Geography/localization (e.g., Schweiz/Suisse/Switzerland; use multilingual synonyms when helpful) + - Time horizon or recency if relevant + - Boolean operators and synonyms to increase precision (use AND/OR, quotes, parentheses) + - Avoid single-topic or generic queries focused only on one facet (e.g., pure valuation metrics) + - When facets are truly distinct, create 1–3 focused actions with precise queries rather than one vague catch-all +- Document list parameters must reference only existing labels or prior action outputs; do not reference future outputs + +DOCUMENT ROUTING GUIDANCE: +- Each action should produce documents with a clear resultLabel for routing +- Use consistent naming: "round{current_round}_task{current_task}_action{action_number}_{descriptive_label}" +- Ensure document flow: Action A produces documents that Action B can consume +- Document labels should be descriptive of content, not just "results" or "output" +- Consider what subsequent actions will need and structure outputs accordingly + +REQUIRED JSON STRUCTURE: +{{ + "actions": [ + {{ + "method": "method_name", + "action": "action_name", + "parameters": {{}}, + "resultLabel": "round{current_round}_task{current_task}_action{action_number}_{descriptive_label}", + "description": "Brief description of what this action accomplishes", + "userMessage": "User-friendly message explaining what this action will do in user's language" + }} + ] +}} + +IMPORTANT NOTES: +- Respond with ONLY the JSON object. Do not include any explanatory text. +- Before creating any document extraction action, verify that AVAILABLE DOCUMENTS contains actual document references. +- Always include a user-friendly userMessage for each action in the user's language. +- The examples above show German user messages as reference - adapt the language to match the USER LANGUAGE specified above.""" + + +def createActionSelectionPromptTemplate() -> str: + """Create action selection prompt template with placeholders.""" + return """Select exactly one action to advance the task. + +OBJECTIVE: {{KEY:USER_PROMPT}} +AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}} +USER LANGUAGE: {{KEY:USER_LANGUAGE}} + +MINIMAL TOOL CATALOG (method -> action -> [parameterNames]): +{{KEY:AVAILABLE_METHODS}} + +BUSINESS RULES: +- Pick exactly one action per step. +- Derive choice from objective and success criteria. +- Prefer user language. +- Keep it minimal; avoid provider specifics. + +RESPONSE FORMAT (JSON only): +{{"action":{{"method":"web","name":"search"}}}}""" + + +def createActionParameterPromptTemplate() -> str: + """Create action parameter prompt template with placeholders.""" + return """Provide only the required parameters for this action. + +SELECTED ACTION: {{KEY:SELECTED_ACTION}} +ACTION SIGNATURE: {{KEY:ACTION_SIGNATURE}} +OBJECTIVE: {{KEY:USER_PROMPT}} +AVAILABLE DOCUMENTS: {{KEY:AVAILABLE_DOCUMENTS}} +USER LANGUAGE: {{KEY:USER_LANGUAGE}} + +RULES: +- Return only the parameters object. +- Include user language if relevant. +- Reference documents only by exact labels available. +- Avoid unnecessary fields; host applies defaults. +- Use the ACTION SIGNATURE above to understand what parameters are required. +- Convert the objective into appropriate parameter values as needed. + +RESPONSE FORMAT (JSON only): +{{"parameters":{{}}}}""" + + +def createRefinementPromptTemplate() -> str: + """Create refinement prompt template with placeholders.""" + return """Decide next step based on observation. + +OBJECTIVE: {{KEY:USER_PROMPT}} +OBSERVATION: +{{KEY:REVIEW_CONTENT}} + +RULES: +- If criteria are met or no further action helps, decide stop. +- Else decide continue. + +RESPONSE FORMAT (JSON only): +{{"decision":"continue","reason":"Need more data"}}""" + + +def createResultReviewPromptTemplate() -> str: + """Create result review prompt template with placeholders.""" + return """You are a result validation AI that reviews task execution outcomes and determines success, retry needs, or failure. + +TASK OBJECTIVE: {{KEY:USER_PROMPT}} + +EXECUTION RESULTS: +{{KEY:REVIEW_CONTENT}} + +VALIDATION CRITERIA: +- Review each action's success/failure status +- Check if required documents were produced +- Validate document quality and completeness +- Assess if success criteria were met +- Identify any missing or incomplete outputs +- Determine if retry would help or if task should be marked as failed + +REQUIRED JSON STRUCTURE: +{{ + "status": "success|retry|failed", + "reason": "Detailed explanation of the validation decision", + "improvements": ["specific improvement 1", "specific improvement 2"], + "quality_score": 8, // 1-10 scale + "met_criteria": ["criteria1", "criteria2"], + "unmet_criteria": ["criteria3", "criteria4"], + "confidence": 0.85, // 0.0-1.0 scale + "userMessage": "User-friendly message explaining the validation result" +}} + +VALIDATION PRINCIPLES: +- Be thorough but fair in assessment +- Focus on business value and outcomes +- Consider both technical execution and business results +- Provide specific, actionable improvement suggestions +- Use quality scores to track progress across retries +- Clearly identify which success criteria were met vs. unmet +- Set appropriate confidence levels based on evidence quality + +NOTE: Respond with ONLY the JSON object. Do not include any explanatory text.""" + + +# Helper functions to extract content for placeholders + +def extractUserPrompt(context) -> str: + """Extract user prompt from context.""" + if hasattr(context, 'task_step') and context.task_step: + return context.task_step.objective or 'No request specified' + return 'No request specified' + + +def extractAvailableDocuments(context) -> str: + """Extract available documents from context.""" + if hasattr(context, 'workflow') and context.workflow: + return _getAvailableDocuments(context.workflow) + return "No documents available" + + +def extractWorkflowHistory(service, context) -> str: + """Extract workflow history from context.""" + if hasattr(context, 'workflow') and context.workflow: + return _getPreviousRoundContext(service, context.workflow) or "No previous workflow rounds - this is the first round." + return "No previous workflow rounds - this is the first round." + + +def extractAvailableMethods(service) -> str: + """Extract available methods for action planning.""" + methodList = getMethodsList(service) + method_actions = {} + for sig in methodList: + if '.' in sig: + method, rest = sig.split('.', 1) + action = rest.split('(')[0] + method_actions.setdefault(method, []).append((action, sig)) + + # Create a structured JSON format for better AI parsing + available_methods_json = {} + for method, actions in method_actions.items(): + available_methods_json[method] = {} + # Get the method instance for accessing docstrings + method_instance = methods.get(method, {}).get('instance') if methods else None + + for action, sig in actions: + # Parse the signature to extract parameters + if '(' in sig and ')' in sig: + # Extract parameters from signature + params_start = sig.find('(') + params_end = sig.find(')') + params_str = sig[params_start+1:params_end] + + # Parse parameters directly from the docstring - much simpler and more reliable! + parameters = [] + + # Get the actual function's docstring + if method_instance and hasattr(method_instance, action): + func = getattr(method_instance, action) + if hasattr(func, '__doc__') and func.__doc__: + docstring = func.__doc__ + + # Parse Parameters section from docstring + lines = docstring.split('\n') + in_parameters = False + for i, line in enumerate(lines): + original_line = line + line = line.strip() + + if line.startswith('Parameters:'): + in_parameters = True + continue + elif line.startswith('Returns:') or line.startswith('Raises:') or line.startswith('Note:') or line.startswith('Example:') or line.startswith('Examples:'): + in_parameters = False + continue + elif in_parameters and line and not line.startswith('-') and not line.startswith('*'): + # This is a parameter line + if ':' in line: + param_name = line.split(':')[0].strip() + param_desc = line.split(':', 1)[1].strip() + parameters.append(f"{param_name}: {param_desc}") + + available_methods_json[method][action] = parameters + else: + available_methods_json[method][action] = [] + + return json.dumps(available_methods_json, indent=2, ensure_ascii=False) + + +def extractUserLanguage(service) -> str: + """Extract user language from service.""" + return service.user.language if service and service.user else 'en' + + +def extractReviewContent(context) -> str: + """Extract review content from context.""" + if hasattr(context, 'action_results') and context.action_results: + # Build result summary + result_summary = "" + for i, result in enumerate(context.action_results): + result_summary += f"\nRESULT {i+1}:\n" + result_summary += f" Success: {result.success}\n" + if result.error: + result_summary += f" Error: {result.error}\n" + + if result.documents: + result_summary += f" Documents: {len(result.documents)} document(s)\n" + for doc in result.documents: + doc_name = getattr(doc, 'documentName', 'Unknown') + doc_mime = getattr(doc, 'mimeType', 'Unknown') + result_summary += f" - {doc_name} ({doc_mime})\n" + else: + result_summary += f" Documents: None\n" + + return result_summary + elif hasattr(context, 'observation') and context.observation: + return json.dumps(context.observation, ensure_ascii=False) + else: + return "No review content available"