# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Knowledge service: 3-tier RAG with indexing, semantic search, and context building.""" import logging from typing import Any, Callable, Dict, List, Optional from modules.datamodels.datamodelKnowledge import ( FileContentIndex, ContentChunk, WorkflowMemory, ) from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) DEFAULT_CHUNK_SIZE = 512 DEFAULT_CONTEXT_BUDGET = 8000 class KnowledgeService: """Service for Knowledge Store operations: indexing, retrieval, and context building.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context self._getService = get_service self._knowledgeDb = getKnowledgeInterface(context.user) # ========================================================================= # Embedding helper # ========================================================================= async def _embed(self, texts: List[str]) -> List[List[float]]: """Embed texts via the AI interface's generic embedding method.""" aiService = self._getService("ai") await aiService.ensureAiObjectsInitialized() aiObjects = aiService.aiObjects if aiObjects is None: logger.warning("Embedding skipped: aiObjects not available") return [] response = await aiObjects.callEmbedding(texts) if response.errorCount > 0: logger.error(f"Embedding failed: {response.content}") return [] return (response.metadata or {}).get("embeddings", []) async def _embedSingle(self, text: str) -> List[float]: """Embed a single text. Returns empty list on failure.""" results = await self._embed([text]) return results[0] if results else [] # ========================================================================= # File Indexing (called after extraction, before embedding) # ========================================================================= async def indexFile( self, fileId: str, fileName: str, mimeType: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contentObjects: List[Dict[str, Any]] = None, structure: Dict[str, Any] = None, containerPath: str = None, ) -> FileContentIndex: """Index a file's content objects and create embeddings for text chunks. This is the main entry point after non-AI extraction has produced content objects. Args: fileId: The file ID. fileName: Original file name. mimeType: MIME type. userId: Owner user. featureInstanceId: Feature instance scope. mandateId: Mandate scope. contentObjects: List of extracted content objects, each with keys: contentType (str), data (str), contextRef (dict), contentObjectId (str). structure: Structural overview of the file. containerPath: Path within container if applicable. Returns: The created FileContentIndex. """ contentObjects = contentObjects or [] # 1. Create FileContentIndex index = FileContentIndex( id=fileId, userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, fileName=fileName, mimeType=mimeType, containerPath=containerPath, totalObjects=len(contentObjects), totalSize=sum(len(obj.get("data", "").encode("utf-8")) for obj in contentObjects), structure=structure or {}, objectSummary=[ { "id": obj.get("contentObjectId", ""), "type": obj.get("contentType", "other"), "size": len(obj.get("data", "").encode("utf-8")), "ref": obj.get("contextRef", {}), } for obj in contentObjects ], status="extracted", ) self._knowledgeDb.upsertFileContentIndex(index) # 2. Chunk text content objects and create embeddings textObjects = [o for o in contentObjects if o.get("contentType") == "text"] if textObjects: self._knowledgeDb.updateFileStatus(fileId, "embedding") chunks = _chunkForEmbedding(textObjects, chunkSize=DEFAULT_CHUNK_SIZE) texts = [c["data"] for c in chunks] embeddings = await self._embed(texts) if texts else [] for i, chunk in enumerate(chunks): embedding = embeddings[i] if i < len(embeddings) else None contentChunk = ContentChunk( contentObjectId=chunk["contentObjectId"], fileId=fileId, userId=userId, featureInstanceId=featureInstanceId, contentType="text", data=chunk["data"], contextRef=chunk["contextRef"], embedding=embedding, ) self._knowledgeDb.upsertContentChunk(contentChunk) # 3. Store non-text content objects (images, etc.) without embedding nonTextObjects = [o for o in contentObjects if o.get("contentType") != "text"] for obj in nonTextObjects: contentChunk = ContentChunk( contentObjectId=obj.get("contentObjectId", ""), fileId=fileId, userId=userId, featureInstanceId=featureInstanceId, contentType=obj.get("contentType", "other"), data=obj.get("data", ""), contextRef=obj.get("contextRef", {}), embedding=None, ) self._knowledgeDb.upsertContentChunk(contentChunk) self._knowledgeDb.updateFileStatus(fileId, "indexed") index.status = "indexed" logger.info(f"Indexed file {fileId} ({fileName}): {len(contentObjects)} objects, {len(textObjects)} text chunks") return index # ========================================================================= # RAG Context Building (3-tier search) # ========================================================================= async def buildAgentContext( self, currentPrompt: str, workflowId: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contextBudget: int = DEFAULT_CONTEXT_BUDGET, ) -> str: """Build RAG context for an agent round by searching all 3 layers. Args: currentPrompt: The current user prompt to find relevant context for. workflowId: Current workflow ID. userId: Current user. featureInstanceId: Feature instance scope. mandateId: Mandate scope. contextBudget: Maximum characters for the context string. Returns: Formatted context string for injection into the agent's system prompt. """ queryVector = await self._embedSingle(currentPrompt) if not queryVector: return "" builder = _ContextBuilder(budget=contextBudget) # Layer 1: Instance Layer (user's own documents, highest priority) instanceChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, userId=userId, featureInstanceId=featureInstanceId, limit=15, minScore=0.65, ) if instanceChunks: builder.add(priority=1, label="Relevant Documents", items=instanceChunks) # Layer 2: Workflow Layer (current workflow entities & memory) entities = self._knowledgeDb.getWorkflowEntities(workflowId) if entities: builder.add(priority=2, label="Workflow Context", items=entities, isKeyValue=True) # Layer 3: Shared Layer (mandate-wide shared documents) sharedChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, mandateId=mandateId, isShared=True, limit=10, minScore=0.7, ) if sharedChunks: builder.add(priority=3, label="Shared Knowledge", items=sharedChunks) return builder.build() # ========================================================================= # Workflow Memory # ========================================================================= async def storeEntity( self, workflowId: str, userId: str, featureInstanceId: str, key: str, value: str, source: str = "extraction", ) -> WorkflowMemory: """Store a key-value entity in workflow memory with optional embedding.""" embedding = await self._embedSingle(f"{key}: {value}") memory = WorkflowMemory( workflowId=workflowId, userId=userId, featureInstanceId=featureInstanceId, key=key, value=value, source=source, embedding=embedding if embedding else None, ) self._knowledgeDb.upsertWorkflowMemory(memory) return memory def getEntities(self, workflowId: str) -> List[Dict[str, Any]]: """Get all entities for a workflow.""" return self._knowledgeDb.getWorkflowEntities(workflowId) # ========================================================================= # File Status # ========================================================================= def getFileStatus(self, fileId: str) -> Optional[str]: """Get the indexing status of a file.""" index = self._knowledgeDb.getFileContentIndex(fileId) return index.get("status") if index else None def isFileIndexed(self, fileId: str) -> bool: """Check if a file has been fully indexed.""" return self.getFileStatus(fileId) == "indexed" # ========================================================================= # On-Demand Extraction (Smart Document Handling) # ========================================================================= async def readSection(self, fileId: str, sectionId: str) -> List[Dict[str, Any]]: """Read content objects for a specific section. Uses cache if available. Args: fileId: Source file ID. sectionId: Section identifier from the FileContentIndex structure. Returns: List of content object dicts with data and contextRef. """ cached = self._knowledgeDb.getContentChunks(fileId) sectionChunks = [ c for c in (cached or []) if (c.get("contextRef", {}).get("sectionId") == sectionId) ] if sectionChunks: return sectionChunks index = self._knowledgeDb.getFileContentIndex(fileId) if not index: return [] structure = index.get("structure", {}) if isinstance(index, dict) else getattr(index, "structure", {}) sections = structure.get("sections", []) section = next((s for s in sections if s.get("id") == sectionId), None) if not section: return [] startPage = section.get("startPage", 0) endPage = section.get("endPage", startPage) return await self._extractPagesOnDemand(fileId, startPage, endPage, sectionId) async def readContentObjects( self, fileId: str, filter: Dict[str, Any] = None ) -> List[Dict[str, Any]]: """Read content objects with optional filters (pageIndex, contentType, sectionId). Args: fileId: Source file ID. filter: Optional dict with keys pageIndex (list[int]), contentType (str), sectionId (str). Returns: Filtered list of content chunk dicts. """ filter = filter or {} chunks = self._knowledgeDb.getContentChunks(fileId) or [] if "pageIndex" in filter: targetPages = filter["pageIndex"] if isinstance(targetPages, int): targetPages = [targetPages] chunks = [ c for c in chunks if c.get("contextRef", {}).get("pageIndex") in targetPages ] if "contentType" in filter: chunks = [c for c in chunks if c.get("contentType") == filter["contentType"]] if "sectionId" in filter: chunks = [ c for c in chunks if c.get("contextRef", {}).get("sectionId") == filter["sectionId"] ] return chunks async def extractContainerItem( self, fileId: str, containerPath: str ) -> Optional[Dict[str, Any]]: """On-demand extraction of a specific item within a container. If the item is already indexed, returns existing data. Otherwise triggers extraction and indexing. Args: fileId: The container file ID. containerPath: Path within the container (e.g. "folder/report.pdf"). Returns: FileContentIndex dict for the extracted item, or None. """ existing = self._knowledgeDb.getFileContentIndex(fileId) if existing: existingPath = existing.get("containerPath") if isinstance(existing, dict) else getattr(existing, "containerPath", None) if existingPath == containerPath: return existing logger.info(f"On-demand extraction for {containerPath} in file {fileId}") return None async def _extractPagesOnDemand( self, fileId: str, startPage: int, endPage: int, sectionId: str ) -> List[Dict[str, Any]]: """Extract specific pages from a file and cache in knowledge store.""" try: chatService = self._getService("chat") fileContent = chatService.getFileContent(fileId) if not fileContent: return [] fileData = fileContent.get("data", b"") mimeType = fileContent.get("mimeType", "") fileName = fileContent.get("fileName", "") if isinstance(fileData, str): import base64 fileData = base64.b64decode(fileData) if mimeType != "application/pdf": return [] try: import fitz except ImportError: return [] doc = fitz.open(stream=fileData, filetype="pdf") results = [] for pageIdx in range(startPage, min(endPage + 1, len(doc))): page = doc[pageIdx] text = page.get_text() or "" if not text.strip(): continue chunk = ContentChunk( contentObjectId=f"page-{pageIdx}", fileId=fileId, userId=self._context.user.id if self._context.user else "", featureInstanceId=self._context.feature_instance_id or "", contentType="text", data=text, contextRef={ "containerPath": fileName, "location": f"page:{pageIdx+1}", "pageIndex": pageIdx, "sectionId": sectionId, }, ) embedding = await self._embedSingle(text[:2000]) if embedding: chunk.embedding = embedding self._knowledgeDb.upsertContentChunk(chunk) results.append(chunk.model_dump()) doc.close() return results except Exception as e: logger.error(f"On-demand page extraction failed: {e}") return [] def getFileContentIndex(self, fileId: str) -> Optional[Dict[str, Any]]: """Get the FileContentIndex for a file.""" return self._knowledgeDb.getFileContentIndex(fileId) # ============================================================================= # Internal helpers # ============================================================================= def _chunkForEmbedding( textObjects: List[Dict[str, Any]], chunkSize: int = 512 ) -> List[Dict[str, Any]]: """Split text content objects into chunks suitable for embedding. Each chunk preserves the contextRef from its source object. Long texts are split at sentence boundaries where possible. """ chunks = [] for obj in textObjects: text = obj.get("data", "") contentObjectId = obj.get("contentObjectId", "") contextRef = obj.get("contextRef", {}) if len(text) <= chunkSize: chunks.append({ "data": text, "contentObjectId": contentObjectId, "contextRef": contextRef, }) continue # Split at sentence boundaries sentences = text.replace("\n", " ").split(". ") currentChunk = "" for sentence in sentences: candidate = f"{currentChunk}. {sentence}" if currentChunk else sentence if len(candidate) > chunkSize and currentChunk: chunks.append({ "data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef, }) currentChunk = sentence else: currentChunk = candidate if currentChunk.strip(): chunks.append({ "data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef, }) return chunks class _ContextBuilder: """Assembles RAG context from multiple sources respecting a character budget.""" def __init__(self, budget: int): self._budget = budget self._sections: List[Dict[str, Any]] = [] def add( self, priority: int, label: str, items: List[Dict[str, Any]], isKeyValue: bool = False, ): self._sections.append({ "priority": priority, "label": label, "items": items, "isKeyValue": isKeyValue, }) def build(self) -> str: self._sections.sort(key=lambda s: s["priority"]) parts = [] remaining = self._budget for section in self._sections: if remaining <= 0: break header = f"### {section['label']}\n" sectionText = header remaining -= len(header) for item in section["items"]: if remaining <= 0: break if section["isKeyValue"]: line = f"- {item.get('key', '')}: {item.get('value', '')}\n" else: data = item.get("data", "") ref = item.get("contextRef", {}) score = item.get("_score", "") refStr = f" [{ref}]" if ref else "" line = f"{data}{refStr}\n" if len(line) <= remaining: sectionText += line remaining -= len(line) parts.append(sectionText) return "\n".join(parts).strip()