# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Knowledge service: 3-tier RAG with indexing, semantic search, and context building.""" import logging import re from typing import Any, Callable, Dict, List, Optional from modules.datamodels.datamodelKnowledge import ( FileContentIndex, ContentChunk, WorkflowMemory, ) from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) CHARS_PER_TOKEN = 4 DEFAULT_CHUNK_TOKENS = 400 DEFAULT_CONTEXT_BUDGET = 8000 class KnowledgeService: """Service for Knowledge Store operations: indexing, retrieval, and context building.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context self._getService = get_service self._knowledgeDb = getKnowledgeInterface(context.user) # ========================================================================= # Embedding helper # ========================================================================= async def _embed(self, texts: List[str]) -> List[List[float]]: """Embed texts via AiService (respects allowedProviders).""" aiService = self._getService("ai") response = await aiService.callEmbedding(texts) if response.errorCount > 0: logger.error(f"Embedding failed: {response.content}") return [] return (response.metadata or {}).get("embeddings", []) async def _embedSingle(self, text: str) -> List[float]: """Embed a single text. Returns empty list on failure.""" results = await self._embed([text]) return results[0] if results else [] # ========================================================================= # File Indexing (called after extraction, before embedding) # ========================================================================= async def indexFile( self, fileId: str, fileName: str, mimeType: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contentObjects: List[Dict[str, Any]] = None, structure: Dict[str, Any] = None, containerPath: str = None, ) -> FileContentIndex: """Index a file's content objects and create embeddings for text chunks. This is the main entry point after non-AI extraction has produced content objects. Args: fileId: The file ID. fileName: Original file name. mimeType: MIME type. userId: Owner user. featureInstanceId: Feature instance scope. mandateId: Mandate scope. contentObjects: List of extracted content objects, each with keys: contentType (str), data (str), contextRef (dict), contentObjectId (str). structure: Structural overview of the file. containerPath: Path within container if applicable. Returns: The created FileContentIndex. """ contentObjects = contentObjects or [] # 1. Create FileContentIndex index = FileContentIndex( id=fileId, userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, fileName=fileName, mimeType=mimeType, containerPath=containerPath, totalObjects=len(contentObjects), totalSize=sum(len(obj.get("data", "").encode("utf-8")) for obj in contentObjects), structure=structure or {}, objectSummary=[ { "id": obj.get("contentObjectId", ""), "type": obj.get("contentType", "other"), "size": len(obj.get("data", "").encode("utf-8")), "ref": obj.get("contextRef", {}), } for obj in contentObjects ], status="extracted", ) self._knowledgeDb.upsertFileContentIndex(index) # 2. Chunk text content objects and create embeddings textObjects = [o for o in contentObjects if o.get("contentType") == "text"] if textObjects: self._knowledgeDb.updateFileStatus(fileId, "embedding") chunks = _chunkForEmbedding(textObjects, maxTokens=DEFAULT_CHUNK_TOKENS) texts = [c["data"] for c in chunks] totalChars = sum(len(t) for t in texts) estTokens = totalChars // CHARS_PER_TOKEN logger.info( f"Embedding file {fileId}: {len(textObjects)} text objects -> " f"{len(chunks)} chunks, ~{estTokens} tokens total" ) embeddings = await self._embed(texts) if texts else [] for i, chunk in enumerate(chunks): embedding = embeddings[i] if i < len(embeddings) else None contentChunk = ContentChunk( contentObjectId=chunk["contentObjectId"], fileId=fileId, userId=userId, featureInstanceId=featureInstanceId, contentType="text", data=chunk["data"], contextRef=chunk["contextRef"], embedding=embedding, ) self._knowledgeDb.upsertContentChunk(contentChunk) # 3. Store non-text content objects (images, etc.) without embedding nonTextObjects = [o for o in contentObjects if o.get("contentType") != "text"] for obj in nonTextObjects: contentChunk = ContentChunk( contentObjectId=obj.get("contentObjectId", ""), fileId=fileId, userId=userId, featureInstanceId=featureInstanceId, contentType=obj.get("contentType", "other"), data=obj.get("data", ""), contextRef=obj.get("contextRef", {}), embedding=None, ) self._knowledgeDb.upsertContentChunk(contentChunk) self._knowledgeDb.updateFileStatus(fileId, "indexed") index.status = "indexed" logger.info(f"Indexed file {fileId} ({fileName}): {len(contentObjects)} objects, {len(textObjects)} text chunks") return index # ========================================================================= # RAG Context Building (3-tier search) # ========================================================================= async def buildAgentContext( self, currentPrompt: str, workflowId: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contextBudget: int = DEFAULT_CONTEXT_BUDGET, ) -> str: """Build RAG context for an agent round by searching all 3 layers. Args: currentPrompt: The current user prompt to find relevant context for. workflowId: Current workflow ID. userId: Current user. featureInstanceId: Feature instance scope. mandateId: Mandate scope. contextBudget: Maximum characters for the context string. Returns: Formatted context string for injection into the agent's system prompt. """ queryVector = await self._embedSingle(currentPrompt) if not queryVector: return "" builder = _ContextBuilder(budget=contextBudget) # Layer 1: Instance Layer (user's own documents, highest priority) instanceChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, userId=userId, featureInstanceId=featureInstanceId, limit=15, minScore=0.65, ) if instanceChunks: builder.add(priority=1, label="Relevant Documents", items=instanceChunks) # Layer 2: Workflow Layer (current workflow entities & memory) entities = self._knowledgeDb.getWorkflowEntities(workflowId) if entities: builder.add(priority=2, label="Workflow Context", items=entities, isKeyValue=True) # Layer 3: Shared Layer (mandate-wide shared documents) sharedChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, mandateId=mandateId, isShared=True, limit=10, minScore=0.7, ) if sharedChunks: builder.add(priority=3, label="Shared Knowledge", items=sharedChunks) return builder.build() # ========================================================================= # Workflow Memory # ========================================================================= async def storeEntity( self, workflowId: str, userId: str, featureInstanceId: str, key: str, value: str, source: str = "extraction", ) -> WorkflowMemory: """Store a key-value entity in workflow memory with optional embedding.""" embedding = await self._embedSingle(f"{key}: {value}") memory = WorkflowMemory( workflowId=workflowId, userId=userId, featureInstanceId=featureInstanceId, key=key, value=value, source=source, embedding=embedding if embedding else None, ) self._knowledgeDb.upsertWorkflowMemory(memory) return memory def getEntities(self, workflowId: str) -> List[Dict[str, Any]]: """Get all entities for a workflow.""" return self._knowledgeDb.getWorkflowEntities(workflowId) # ========================================================================= # File Status # ========================================================================= def getFileStatus(self, fileId: str) -> Optional[str]: """Get the indexing status of a file.""" index = self._knowledgeDb.getFileContentIndex(fileId) return index.get("status") if index else None def isFileIndexed(self, fileId: str) -> bool: """Check if a file has been fully indexed.""" return self.getFileStatus(fileId) == "indexed" # ========================================================================= # On-Demand Extraction (Smart Document Handling) # ========================================================================= async def readSection(self, fileId: str, sectionId: str) -> List[Dict[str, Any]]: """Read content objects for a specific section. Uses cache if available. Args: fileId: Source file ID. sectionId: Section identifier from the FileContentIndex structure. Returns: List of content object dicts with data and contextRef. """ cached = self._knowledgeDb.getContentChunks(fileId) sectionChunks = [ c for c in (cached or []) if (c.get("contextRef", {}).get("sectionId") == sectionId) ] if sectionChunks: return sectionChunks index = self._knowledgeDb.getFileContentIndex(fileId) if not index: return [] structure = index.get("structure", {}) if isinstance(index, dict) else getattr(index, "structure", {}) sections = structure.get("sections", []) section = next((s for s in sections if s.get("id") == sectionId), None) if not section: return [] startPage = section.get("startPage", 0) endPage = section.get("endPage", startPage) return await self._extractPagesOnDemand(fileId, startPage, endPage, sectionId) async def readContentObjects( self, fileId: str, filter: Dict[str, Any] = None ) -> List[Dict[str, Any]]: """Read content objects with optional filters (pageIndex, contentType, sectionId). Args: fileId: Source file ID. filter: Optional dict with keys pageIndex (list[int]), contentType (str), sectionId (str). Returns: Filtered list of content chunk dicts. """ filter = filter or {} chunks = self._knowledgeDb.getContentChunks(fileId) or [] if "pageIndex" in filter: targetPages = filter["pageIndex"] if isinstance(targetPages, int): targetPages = [targetPages] chunks = [ c for c in chunks if c.get("contextRef", {}).get("pageIndex") in targetPages ] if "contentType" in filter: chunks = [c for c in chunks if c.get("contentType") == filter["contentType"]] if "sectionId" in filter: chunks = [ c for c in chunks if c.get("contextRef", {}).get("sectionId") == filter["sectionId"] ] return chunks async def extractContainerItem( self, fileId: str, containerPath: str ) -> Optional[Dict[str, Any]]: """On-demand extraction of a specific item within a container. If the item is already indexed, returns existing data. Otherwise triggers extraction and indexing. Args: fileId: The container file ID. containerPath: Path within the container (e.g. "folder/report.pdf"). Returns: FileContentIndex dict for the extracted item, or None. """ existing = self._knowledgeDb.getFileContentIndex(fileId) if existing: existingPath = existing.get("containerPath") if isinstance(existing, dict) else getattr(existing, "containerPath", None) if existingPath == containerPath: return existing logger.info(f"On-demand extraction for {containerPath} in file {fileId}") return None async def _extractPagesOnDemand( self, fileId: str, startPage: int, endPage: int, sectionId: str ) -> List[Dict[str, Any]]: """Extract specific pages from a file and cache in knowledge store.""" try: chatService = self._getService("chat") fileContent = chatService.getFileContent(fileId) if not fileContent: return [] fileData = fileContent.get("data", b"") mimeType = fileContent.get("mimeType", "") fileName = fileContent.get("fileName", "") if isinstance(fileData, str): import base64 fileData = base64.b64decode(fileData) if mimeType != "application/pdf": return [] try: import fitz except ImportError: return [] doc = fitz.open(stream=fileData, filetype="pdf") results = [] for pageIdx in range(startPage, min(endPage + 1, len(doc))): page = doc[pageIdx] text = page.get_text() or "" if not text.strip(): continue chunk = ContentChunk( contentObjectId=f"page-{pageIdx}", fileId=fileId, userId=self._context.user.id if self._context.user else "", featureInstanceId=self._context.feature_instance_id or "", contentType="text", data=text, contextRef={ "containerPath": fileName, "location": f"page:{pageIdx+1}", "pageIndex": pageIdx, "sectionId": sectionId, }, ) embedding = await self._embedSingle(text[:2000]) if embedding: chunk.embedding = embedding self._knowledgeDb.upsertContentChunk(chunk) results.append(chunk.model_dump()) doc.close() return results except Exception as e: logger.error(f"On-demand page extraction failed: {e}") return [] def getFileContentIndex(self, fileId: str) -> Optional[Dict[str, Any]]: """Get the FileContentIndex for a file.""" return self._knowledgeDb.getFileContentIndex(fileId) # ============================================================================= # Internal helpers # ============================================================================= def _estimateTokens(text: str) -> int: """Estimate token count using character-based heuristic (1 token ~ 4 chars).""" return max(1, len(text) // CHARS_PER_TOKEN) def _splitSentences(text: str) -> List[str]: """Split text into sentences at common boundaries (.!?) followed by whitespace.""" parts = re.split(r'(?<=[.!?])\s+', text.replace("\n", " ").strip()) return [p for p in parts if p.strip()] def _hardSplitByTokens(text: str, maxTokens: int) -> List[str]: """Force-split text into pieces that each fit within maxTokens. Used as safety net when sentence splitting produces oversized segments. Splits at word boundaries where possible. """ maxChars = maxTokens * CHARS_PER_TOKEN pieces = [] while len(text) > maxChars: splitAt = text.rfind(" ", 0, maxChars) if splitAt <= 0: splitAt = maxChars pieces.append(text[:splitAt].strip()) text = text[splitAt:].strip() if text: pieces.append(text) return pieces def _chunkForEmbedding( textObjects: List[Dict[str, Any]], maxTokens: int = DEFAULT_CHUNK_TOKENS ) -> List[Dict[str, Any]]: """Split text content objects into token-aware chunks suitable for embedding. Each chunk preserves the contextRef from its source object. Splits at sentence boundaries; applies hard-cap if a single sentence exceeds maxTokens. """ chunks = [] for obj in textObjects: text = (obj.get("data", "") or "").strip() if not text: continue contentObjectId = obj.get("contentObjectId", "") contextRef = obj.get("contextRef", {}) if _estimateTokens(text) <= maxTokens: chunks.append({"data": text, "contentObjectId": contentObjectId, "contextRef": contextRef}) continue sentences = _splitSentences(text) currentChunk = "" for sentence in sentences: if _estimateTokens(sentence) > maxTokens: if currentChunk.strip(): chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef}) currentChunk = "" for piece in _hardSplitByTokens(sentence, maxTokens): chunks.append({"data": piece, "contentObjectId": contentObjectId, "contextRef": contextRef}) continue candidate = f"{currentChunk} {sentence}".strip() if currentChunk else sentence if _estimateTokens(candidate) > maxTokens: if currentChunk.strip(): chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef}) currentChunk = sentence else: currentChunk = candidate if currentChunk.strip(): chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef}) return chunks class _ContextBuilder: """Assembles RAG context from multiple sources respecting a character budget.""" def __init__(self, budget: int): self._budget = budget self._sections: List[Dict[str, Any]] = [] def add( self, priority: int, label: str, items: List[Dict[str, Any]], isKeyValue: bool = False, ): self._sections.append({ "priority": priority, "label": label, "items": items, "isKeyValue": isKeyValue, }) def build(self) -> str: self._sections.sort(key=lambda s: s["priority"]) parts = [] remaining = self._budget for section in self._sections: if remaining <= 0: break header = f"### {section['label']}\n" sectionText = header remaining -= len(header) for item in section["items"]: if remaining <= 0: break if section["isKeyValue"]: line = f"- {item.get('key', '')}: {item.get('value', '')}\n" else: data = item.get("data", "") ref = item.get("contextRef", {}) score = item.get("_score", "") refStr = f" [{ref}]" if ref else "" line = f"{data}{refStr}\n" if len(line) <= remaining: sectionText += line remaining -= len(line) parts.append(sectionText) return "\n".join(parts).strip()