# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Knowledge service: 3-tier RAG with indexing, semantic search, and context building.""" import hashlib import json import logging import re import time from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Union from modules.datamodels.datamodelKnowledge import ( FileContentIndex, ContentChunk, WorkflowMemory, ) from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) CHARS_PER_TOKEN = 4 DEFAULT_CHUNK_TOKENS = 400 DEFAULT_CONTEXT_BUDGET = 12000 # ============================================================================= # Ingestion façade (P0 of unified-knowledge-indexing concept) # ============================================================================= @dataclass class IngestionJob: """One request to add or refresh content in the unified knowledge store. Callers from any lane (routes, feature hooks, agent tools, connector sync) describe the work they want done via this object; idempotency, scope resolution, and embedding are handled by KnowledgeService.requestIngestion. """ sourceKind: str sourceId: str fileName: str mimeType: str userId: str contentObjects: List[Dict[str, Any]] = field(default_factory=list) featureInstanceId: str = "" mandateId: str = "" structure: Optional[Dict[str, Any]] = None containerPath: Optional[str] = None contentVersion: Optional[str] = None provenance: Optional[Dict[str, Any]] = None @dataclass class IngestionHandle: """Result of requestIngestion. Stable across in-process and future queue impls.""" jobId: str status: str contentHash: str fileId: str index: Optional[FileContentIndex] = None error: Optional[str] = None def _computeIngestionHash(contentObjects: List[Dict[str, Any]]) -> str: """Deterministic SHA256 over (contentType, data) tuples in extractor order. `contentObjectId` is intentionally excluded because extractors generate fresh UUIDs per run (`uuid.uuid4()`), which would make the hash unstable across re-extractions of the same source — defeating idempotency. Order is preserved (no sort) because two different documents can share the same multiset of parts but differ in arrangement (e.g. swapped pages). Text whitespace is preserved intentionally because chunk boundaries depend on it. """ normalized = [ ( str(o.get("contentType", "text") or "text"), o.get("data", "") or "", ) for o in (contentObjects or []) ] payload = json.dumps(normalized, ensure_ascii=False, separators=(",", ":")) return hashlib.sha256(payload.encode("utf-8")).hexdigest() class KnowledgeService: """Service for Knowledge Store operations: indexing, retrieval, and context building.""" def __init__(self, context, get_service: Callable[[str], Any]): self._context = context self._getService = get_service self._knowledgeDb = getKnowledgeInterface(context.user) # ========================================================================= # Embedding helper # ========================================================================= async def _embed(self, texts: List[str]) -> List[List[float]]: """Embed texts via AiService (respects allowedProviders).""" aiService = self._getService("ai") response = await aiService.callEmbedding(texts) if response.errorCount > 0: logger.error(f"Embedding failed: {response.content}") return [] return (response.metadata or {}).get("embeddings", []) async def _embedSingle(self, text: str) -> List[float]: """Embed a single text. Returns empty list on failure.""" results = await self._embed([text]) return results[0] if results else [] # ========================================================================= # Ingestion façade (single entry point for all lanes) # ========================================================================= async def requestIngestion(self, job: IngestionJob) -> IngestionHandle: """Unified entry point for filling the knowledge corpus. Applies idempotency based on a content hash (or caller-supplied `contentVersion`) persisted in `FileContentIndex.structure._ingestion`. Re-runs indexing only when the hash differs or the previous run did not reach `indexed` state. Runs embedding synchronously for now (callers already schedule background tasks where needed). """ jobId = f"{job.sourceKind}:{job.sourceId}" startMs = time.time() contentHash = job.contentVersion or _computeIngestionHash(job.contentObjects) # 1. Check for duplicate via existing FileContentIndex row. existing = None try: existing = self._knowledgeDb.getFileContentIndex(job.sourceId) except Exception: existing = None if existing: existingStructure = ( existing.get("structure") if isinstance(existing, dict) else getattr(existing, "structure", {}) ) or {} existingMeta = existingStructure.get("_ingestion", {}) or {} existingStatus = ( existing.get("status") if isinstance(existing, dict) else getattr(existing, "status", "") ) or "" if existingMeta.get("hash") == contentHash and existingStatus == "indexed": logger.info( "ingestion.skipped.duplicate sourceKind=%s sourceId=%s hash=%s", job.sourceKind, job.sourceId, contentHash[:12], extra={ "event": "ingestion.skipped.duplicate", "jobId": jobId, "sourceKind": job.sourceKind, "sourceId": job.sourceId, "hash": contentHash, "durationMs": int((time.time() - startMs) * 1000), }, ) return IngestionHandle( jobId=jobId, status="duplicate", contentHash=contentHash, fileId=job.sourceId, index=None, ) # 2. Prepare ingestion metadata; stays in structure._ingestion so # later connector revoke/purge can filter chunks by sourceKind / # provenance.connectionId without a schema migration. ingestionMeta = { "hash": contentHash, "sourceKind": job.sourceKind, "sourceId": job.sourceId, "contentVersion": job.contentVersion, "indexedAt": getUtcTimestamp(), "provenance": dict(job.provenance or {}), } structure = dict(job.structure or {}) structure["_ingestion"] = ingestionMeta logger.info( "ingestion.queued sourceKind=%s sourceId=%s objects=%d hash=%s", job.sourceKind, job.sourceId, len(job.contentObjects or []), contentHash[:12], extra={ "event": "ingestion.queued", "jobId": jobId, "sourceKind": job.sourceKind, "sourceId": job.sourceId, "hash": contentHash, "objectCount": len(job.contentObjects or []), }, ) # 3. Run real indexing. try: index = await self._indexFileInternal( fileId=job.sourceId, fileName=job.fileName, mimeType=job.mimeType, userId=job.userId, featureInstanceId=job.featureInstanceId, mandateId=job.mandateId, contentObjects=job.contentObjects or [], structure=structure, containerPath=job.containerPath, ) except Exception as exc: logger.error( "ingestion.failed sourceKind=%s sourceId=%s error=%s", job.sourceKind, job.sourceId, exc, exc_info=True, extra={ "event": "ingestion.failed", "jobId": jobId, "sourceKind": job.sourceKind, "sourceId": job.sourceId, "hash": contentHash, "error": str(exc), "durationMs": int((time.time() - startMs) * 1000), }, ) try: self._knowledgeDb.updateFileStatus(job.sourceId, "failed") except Exception: pass return IngestionHandle( jobId=jobId, status="failed", contentHash=contentHash, fileId=job.sourceId, index=None, error=str(exc), ) logger.info( "ingestion.indexed sourceKind=%s sourceId=%s objects=%d durationMs=%d", job.sourceKind, job.sourceId, len(job.contentObjects or []), int((time.time() - startMs) * 1000), extra={ "event": "ingestion.indexed", "jobId": jobId, "sourceKind": job.sourceKind, "sourceId": job.sourceId, "hash": contentHash, "objectCount": len(job.contentObjects or []), "durationMs": int((time.time() - startMs) * 1000), }, ) return IngestionHandle( jobId=jobId, status="indexed", contentHash=contentHash, fileId=job.sourceId, index=index, ) def getIngestionStatus( self, handleOrJobId: Union[IngestionHandle, str] ) -> Dict[str, Any]: """Map a handle or `sourceKind:sourceId` jobId to a status snapshot.""" if isinstance(handleOrJobId, IngestionHandle): sourceId = handleOrJobId.fileId jobId = handleOrJobId.jobId elif isinstance(handleOrJobId, str) and ":" in handleOrJobId: jobId = handleOrJobId sourceId = handleOrJobId.split(":", 1)[1] else: jobId = str(handleOrJobId) sourceId = str(handleOrJobId) row = None try: row = self._knowledgeDb.getFileContentIndex(sourceId) except Exception: row = None if not row: return { "jobId": jobId, "sourceId": sourceId, "status": "unknown", "contentHash": None, } structure = ( row.get("structure") if isinstance(row, dict) else getattr(row, "structure", {}) ) or {} meta = structure.get("_ingestion", {}) or {} status = ( row.get("status") if isinstance(row, dict) else getattr(row, "status", "") ) or "unknown" return { "jobId": jobId, "sourceId": sourceId, "status": status, "contentHash": meta.get("hash"), "sourceKind": meta.get("sourceKind"), "indexedAt": meta.get("indexedAt"), } # ========================================================================= # File Indexing (called after extraction, before embedding) # ========================================================================= async def indexFile( self, fileId: str, fileName: str, mimeType: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contentObjects: List[Dict[str, Any]] = None, structure: Dict[str, Any] = None, containerPath: str = None, ) -> Optional[FileContentIndex]: """Backward-compatible wrapper delegating to requestIngestion. Existing callers that still invoke `indexFile` directly automatically participate in the idempotency/metrics layer. New callers should prefer `requestIngestion` so they can pass `sourceKind` and `provenance` for connector revoke/purge later. """ job = IngestionJob( sourceKind="file", sourceId=fileId, fileName=fileName, mimeType=mimeType, userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, contentObjects=list(contentObjects or []), structure=structure, containerPath=containerPath, ) handle = await self.requestIngestion(job) if handle.index is not None: return handle.index if handle.status == "duplicate": row = None try: row = self._knowledgeDb.getFileContentIndex(fileId) except Exception: row = None if isinstance(row, dict): try: return FileContentIndex(**row) except Exception: return None return row return None async def _indexFileInternal( self, fileId: str, fileName: str, mimeType: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contentObjects: List[Dict[str, Any]] = None, structure: Dict[str, Any] = None, containerPath: str = None, ) -> FileContentIndex: """Index a file's content objects and create embeddings for text chunks. This is the main entry point after non-AI extraction has produced content objects. Args: fileId: The file ID. fileName: Original file name. mimeType: MIME type. userId: Owner user. featureInstanceId: Feature instance scope. mandateId: Mandate scope. contentObjects: List of extracted content objects, each with keys: contentType (str), data (str), contextRef (dict), contentObjectId (str). structure: Structural overview of the file. containerPath: Path within container if applicable. Returns: The created FileContentIndex. """ contentObjects = contentObjects or [] # 1. Resolve scope fields from FileItem (Single Source of Truth) # FileItem lives in poweron_management; its scope/mandateId/featureInstanceId # are authoritative and must be mirrored onto the FileContentIndex. resolvedScope = "personal" resolvedMandateId = mandateId resolvedFeatureInstanceId = featureInstanceId resolvedUserId = userId _shouldNeutralize = False try: from modules.datamodels.datamodelFiles import FileItem as _FileItem _dbComponent = getattr(self._context, "interfaceDbComponent", None) _fileRecords = _dbComponent.getRecordset(_FileItem, recordFilter={"id": fileId}) if _dbComponent else [] if not _fileRecords: from modules.interfaces.interfaceDbManagement import ComponentObjects _row = ComponentObjects().db._loadRecord(_FileItem, fileId) if _row: _fileRecords = [_row] if _fileRecords: _fileRecord = _fileRecords[0] _get = (lambda k, d=None: _fileRecord.get(k, d)) if isinstance(_fileRecord, dict) else (lambda k, d=None: getattr(_fileRecord, k, d)) _shouldNeutralize = bool(_get("neutralize", False)) _fileScope = _get("scope") if _fileScope: resolvedScope = _fileScope if not resolvedMandateId: resolvedMandateId = str(_get("mandateId", "") or "") if not resolvedFeatureInstanceId: resolvedFeatureInstanceId = str(_get("featureInstanceId", "") or "") _fileCreatedBy = _get("sysCreatedBy") if _fileCreatedBy: resolvedUserId = str(_fileCreatedBy) except Exception: pass # 2. Create FileContentIndex with correct scope from the start index = FileContentIndex( id=fileId, userId=resolvedUserId, featureInstanceId=resolvedFeatureInstanceId, mandateId=resolvedMandateId, scope=resolvedScope, fileName=fileName, mimeType=mimeType, containerPath=containerPath, totalObjects=len(contentObjects), totalSize=sum(len(obj.get("data", "").encode("utf-8")) for obj in contentObjects), structure=structure or {}, objectSummary=[ { "id": obj.get("contentObjectId", ""), "type": obj.get("contentType", "other"), "size": len(obj.get("data", "").encode("utf-8")), "ref": obj.get("contextRef", {}), } for obj in contentObjects ], status="extracted", ) self._knowledgeDb.upsertFileContentIndex(index) # 3. Chunk text content objects and create embeddings textObjects = [o for o in contentObjects if o.get("contentType") == "text"] _neutralSvc = None if _shouldNeutralize: try: _neutralSvc = self._getService("neutralization") except Exception: logger.warning(f"Neutralization service unavailable for file {fileId}") if _shouldNeutralize and textObjects: _neutralizedObjects = [] if _neutralSvc: for _obj in textObjects: _textContent = (_obj.get("data", "") or "").strip() if not _textContent: continue try: _neutralResult = await _neutralSvc.processTextAsync(_textContent, fileId) if _neutralResult and _neutralResult.get("neutralized_text"): _obj["data"] = _neutralResult["neutralized_text"] _neutralizedObjects.append(_obj) else: logger.warning(f"Neutralization failed for file {fileId}, skipping text object (fail-safe)") except Exception as e: logger.warning(f"Neutralization error for file {fileId}: {e}, skipping text object (fail-safe)") textObjects = _neutralizedObjects else: logger.warning(f"Neutralization required for file {fileId} but service unavailable, skipping text indexing") textObjects = [] if textObjects: self._knowledgeDb.updateFileStatus(fileId, "embedding") chunks = _chunkForEmbedding(textObjects, maxTokens=DEFAULT_CHUNK_TOKENS) texts = [c["data"] for c in chunks] totalChars = sum(len(t) for t in texts) estTokens = totalChars // CHARS_PER_TOKEN logger.info( f"Embedding file {fileId}: {len(textObjects)} text objects -> " f"{len(chunks)} chunks, ~{estTokens} tokens total" ) embeddings = await self._embed(texts) if texts else [] for i, chunk in enumerate(chunks): embedding = embeddings[i] if i < len(embeddings) else None contentChunk = ContentChunk( contentObjectId=chunk["contentObjectId"], fileId=fileId, userId=resolvedUserId, featureInstanceId=resolvedFeatureInstanceId, contentType="text", data=chunk["data"], contextRef=chunk["contextRef"], embedding=embedding, ) self._knowledgeDb.upsertContentChunk(contentChunk) # 4. Store non-text content objects (images, etc.) without embedding nonTextObjects = [o for o in contentObjects if o.get("contentType") != "text"] if _shouldNeutralize and nonTextObjects and _neutralSvc: import base64 as _b64 _filteredNonText = [] for _obj in nonTextObjects: if _obj.get("contentType") != "image": _filteredNonText.append(_obj) continue _imgData = (_obj.get("data", "") or "").strip() if not _imgData: _filteredNonText.append(_obj) continue try: _imgBytes = _b64.b64decode(_imgData) _imgResult = await _neutralSvc.processImageAsync(_imgBytes, fileName) if _imgResult.get("status") == "ok": _filteredNonText.append(_obj) logger.debug(f"Image chunk OK for file {fileId}, storing") else: logger.warning(f"Image chunk blocked for file {fileId} (PII detected), skipping (fail-safe)") except Exception as _imgErr: logger.warning(f"Image neutralization check failed for file {fileId}: {_imgErr}, skipping (fail-safe)") nonTextObjects = _filteredNonText for obj in nonTextObjects: contentChunk = ContentChunk( contentObjectId=obj.get("contentObjectId", ""), fileId=fileId, userId=resolvedUserId, featureInstanceId=resolvedFeatureInstanceId, contentType=obj.get("contentType", "other"), data=obj.get("data", ""), contextRef=obj.get("contextRef", {}), embedding=None, ) self._knowledgeDb.upsertContentChunk(contentChunk) # 5. Final upsert ALWAYS — persists scope, neutralization status, etc. index.status = "indexed" if _shouldNeutralize: index.neutralizationStatus = "completed" index.isNeutralized = True self._knowledgeDb.upsertFileContentIndex(index) logger.info( "Indexed file %s (%s): %d objects, %d text chunks, scope=%s, mandate=%s, instance=%s", fileId, fileName, len(contentObjects), len(textObjects), resolvedScope, resolvedMandateId, resolvedFeatureInstanceId, ) if resolvedMandateId: try: from modules.interfaces.interfaceDbBilling import getRootInterface getRootInterface().reconcileMandateStorageBilling(str(resolvedMandateId)) except Exception as ex: logger.warning("reconcileMandateStorageBilling after index failed: %s", ex) return index # ========================================================================= # RAG Context Building (3-tier search) # ========================================================================= async def buildAgentContext( self, currentPrompt: str, workflowId: str, userId: str, featureInstanceId: str = "", mandateId: str = "", contextBudget: int = DEFAULT_CONTEXT_BUDGET, workflowHintItems: List[Dict[str, Any]] = None, isSysAdmin: bool = False, ) -> str: """Build RAG context for an agent round by searching all layers. Layer priority: 0 - File refs from RoundMemory (always included so the agent knows which files exist in this workflow) 1 - Instance documents (user's own indexed files) 1.5 - Semantically relevant RoundMemory entries 2 - Workflow entities 3 - Shared knowledge 4 - Cross-workflow hint (other conversations in this workspace) Args: currentPrompt: The current user prompt to find relevant context for. workflowId: Current workflow ID. userId: Current user. featureInstanceId: Feature instance scope. mandateId: Mandate scope. contextBudget: Maximum characters for the context string. workflowHintItems: Optional pre-built list of other workflow summaries for the cross-workflow hint layer. Returns: Formatted context string for injection into the agent's system prompt. """ queryVector = await self._embedSingle(currentPrompt) if not queryVector: return "" builder = _ContextBuilder(budget=contextBudget) # Layer 0: File references from RoundMemory (always included) fileRefMemories = self._knowledgeDb.getRoundMemoriesByType(workflowId, "file_ref") if fileRefMemories: refItems = [ {"key": m.get("key", ""), "value": m.get("summary", "")[:300]} for m in fileRefMemories ] builder.add( priority=0, label="Known Files", items=refItems, isKeyValue=True, maxChars=2000, ) # Layer 1: Scope-based document search (personal + instance + mandate + global) instanceChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, userId=userId, featureInstanceId=featureInstanceId, mandateId=mandateId, limit=15, minScore=0.65, isSysAdmin=isSysAdmin, ) if instanceChunks: builder.add(priority=1, label="Relevant Documents", items=instanceChunks, maxChars=4000) # Layer 1.5: Semantically relevant RoundMemory entries roundMemories = self._knowledgeDb.semanticSearchRoundMemory( queryVector=queryVector, workflowId=workflowId, limit=10, minScore=0.55, ) if roundMemories: memItems = [] for m in roundMemories: data = m.get("fullData") or m.get("summary", "") memItems.append({ "data": data, "contextRef": { "type": m.get("memoryType", ""), "key": m.get("key", ""), "round": m.get("roundNumber", 0), }, }) seen = {m.get("key") for m in fileRefMemories} if fileRefMemories else set() memItems = [ mi for mi in memItems if mi["contextRef"].get("key") not in seen ] if memItems: builder.add( priority=2, label="Previous Round Context", items=memItems, maxChars=4000, ) # Layer 2: Workflow Layer (current workflow entities & memory) entities = self._knowledgeDb.getWorkflowEntities(workflowId) if entities: builder.add(priority=3, label="Workflow Context", items=entities, isKeyValue=True, maxChars=2000) # Layer 3: Mandate-scoped documents (visible to all mandate users) if mandateId: mandateChunks = self._knowledgeDb.semanticSearch( queryVector=queryVector, scope="mandate", mandateId=mandateId, limit=10, minScore=0.7, isSysAdmin=isSysAdmin, ) if mandateChunks: builder.add(priority=4, label="Shared Knowledge", items=mandateChunks, maxChars=2000) # Layer 4: Cross-workflow hint (other conversations in this workspace) if workflowHintItems: builder.add( priority=5, label="Other Conversations", items=workflowHintItems, isKeyValue=True, maxChars=500, ) return builder.build() # ========================================================================= # Workflow Memory # ========================================================================= async def storeEntity( self, workflowId: str, userId: str, featureInstanceId: str, key: str, value: str, source: str = "extraction", ) -> WorkflowMemory: """Store a key-value entity in workflow memory with optional embedding.""" embedding = await self._embedSingle(f"{key}: {value}") memory = WorkflowMemory( workflowId=workflowId, userId=userId, featureInstanceId=featureInstanceId, key=key, value=value, source=source, embedding=embedding if embedding else None, ) self._knowledgeDb.upsertWorkflowMemory(memory) return memory def getEntities(self, workflowId: str) -> List[Dict[str, Any]]: """Get all entities for a workflow.""" return self._knowledgeDb.getWorkflowEntities(workflowId) # ========================================================================= # File Status # ========================================================================= def getFileStatus(self, fileId: str) -> Optional[str]: """Get the indexing status of a file.""" index = self._knowledgeDb.getFileContentIndex(fileId) return index.get("status") if index else None def isFileIndexed(self, fileId: str) -> bool: """Check if a file has been fully indexed.""" return self.getFileStatus(fileId) == "indexed" # ========================================================================= # On-Demand Extraction (Smart Document Handling) # ========================================================================= async def readSection(self, fileId: str, sectionId: str) -> List[Dict[str, Any]]: """Read content objects for a specific section. Uses cache if available. Args: fileId: Source file ID. sectionId: Section identifier from the FileContentIndex structure. Returns: List of content object dicts with data and contextRef. """ cached = self._knowledgeDb.getContentChunks(fileId) sectionChunks = [ c for c in (cached or []) if (c.get("contextRef", {}).get("sectionId") == sectionId) ] if sectionChunks: return sectionChunks index = self._knowledgeDb.getFileContentIndex(fileId) if not index: return [] structure = index.get("structure", {}) if isinstance(index, dict) else getattr(index, "structure", {}) sections = structure.get("sections", []) section = next((s for s in sections if s.get("id") == sectionId), None) if not section: return [] startPage = section.get("startPage", 0) endPage = section.get("endPage", startPage) return await self._extractPagesOnDemand(fileId, startPage, endPage, sectionId) async def readContentObjects( self, fileId: str, filter: Dict[str, Any] = None ) -> List[Dict[str, Any]]: """Read content objects with optional filters (pageIndex, contentType, sectionId). Args: fileId: Source file ID. filter: Optional dict with keys pageIndex (list[int]), contentType (str), sectionId (str). Returns: Filtered list of content chunk dicts. """ filter = filter or {} chunks = self._knowledgeDb.getContentChunks(fileId) or [] if "pageIndex" in filter: targetPages = filter["pageIndex"] if isinstance(targetPages, int): targetPages = [targetPages] chunks = [ c for c in chunks if c.get("contextRef", {}).get("pageIndex") in targetPages ] if "contentType" in filter: chunks = [c for c in chunks if c.get("contentType") == filter["contentType"]] if "sectionId" in filter: chunks = [ c for c in chunks if c.get("contextRef", {}).get("sectionId") == filter["sectionId"] ] return chunks async def extractContainerItem( self, fileId: str, containerPath: str ) -> Optional[Dict[str, Any]]: """On-demand extraction of a specific item within a container. If the item is already indexed, returns existing data. Otherwise triggers extraction and indexing. Args: fileId: The container file ID. containerPath: Path within the container (e.g. "folder/report.pdf"). Returns: FileContentIndex dict for the extracted item, or None. """ existing = self._knowledgeDb.getFileContentIndex(fileId) if existing: existingPath = existing.get("containerPath") if isinstance(existing, dict) else getattr(existing, "containerPath", None) if existingPath == containerPath: return existing logger.info(f"On-demand extraction for {containerPath} in file {fileId}") return None async def _extractPagesOnDemand( self, fileId: str, startPage: int, endPage: int, sectionId: str ) -> List[Dict[str, Any]]: """Extract specific pages from a file and cache in knowledge store.""" try: chatService = self._getService("chat") fileContent = chatService.getFileContent(fileId) if not fileContent: return [] fileData = fileContent.get("data", b"") mimeType = fileContent.get("mimeType", "") fileName = fileContent.get("fileName", "") if isinstance(fileData, str): import base64 fileData = base64.b64decode(fileData) if mimeType != "application/pdf": return [] try: import fitz except ImportError: return [] doc = fitz.open(stream=fileData, filetype="pdf") results = [] for pageIdx in range(startPage, min(endPage + 1, len(doc))): page = doc[pageIdx] text = page.get_text() or "" if not text.strip(): continue chunk = ContentChunk( contentObjectId=f"page-{pageIdx}", fileId=fileId, userId=self._context.user.id if self._context.user else "", featureInstanceId=self._context.feature_instance_id or "", contentType="text", data=text, contextRef={ "containerPath": fileName, "location": f"page:{pageIdx+1}", "pageIndex": pageIdx, "sectionId": sectionId, }, ) embedding = await self._embedSingle(text[:2000]) if embedding: chunk.embedding = embedding self._knowledgeDb.upsertContentChunk(chunk) results.append(chunk.model_dump()) doc.close() return results except Exception as e: logger.error(f"On-demand page extraction failed: {e}") return [] def getFileContentIndex(self, fileId: str) -> Optional[Dict[str, Any]]: """Get the FileContentIndex for a file.""" return self._knowledgeDb.getFileContentIndex(fileId) # ============================================================================= # Internal helpers # ============================================================================= def _estimateTokens(text: str) -> int: """Estimate token count using character-based heuristic (1 token ~ 4 chars).""" return max(1, len(text) // CHARS_PER_TOKEN) def _splitSentences(text: str) -> List[str]: """Split text into sentences at common boundaries (.!?) followed by whitespace.""" parts = re.split(r'(?<=[.!?])\s+', text.replace("\n", " ").strip()) return [p for p in parts if p.strip()] def _hardSplitByTokens(text: str, maxTokens: int) -> List[str]: """Force-split text into pieces that each fit within maxTokens. Used as safety net when sentence splitting produces oversized segments. Splits at word boundaries where possible. """ maxChars = maxTokens * CHARS_PER_TOKEN pieces = [] while len(text) > maxChars: splitAt = text.rfind(" ", 0, maxChars) if splitAt <= 0: splitAt = maxChars pieces.append(text[:splitAt].strip()) text = text[splitAt:].strip() if text: pieces.append(text) return pieces def _chunkForEmbedding( textObjects: List[Dict[str, Any]], maxTokens: int = DEFAULT_CHUNK_TOKENS ) -> List[Dict[str, Any]]: """Split text content objects into token-aware chunks suitable for embedding. Each chunk preserves the contextRef from its source object. Splits at sentence boundaries; applies hard-cap if a single sentence exceeds maxTokens. """ chunks = [] for obj in textObjects: text = (obj.get("data", "") or "").strip() if not text: continue contentObjectId = obj.get("contentObjectId", "") contextRef = obj.get("contextRef", {}) if _estimateTokens(text) <= maxTokens: chunks.append({"data": text, "contentObjectId": contentObjectId, "contextRef": contextRef}) continue sentences = _splitSentences(text) currentChunk = "" for sentence in sentences: if _estimateTokens(sentence) > maxTokens: if currentChunk.strip(): chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef}) currentChunk = "" for piece in _hardSplitByTokens(sentence, maxTokens): chunks.append({"data": piece, "contentObjectId": contentObjectId, "contextRef": contextRef}) continue candidate = f"{currentChunk} {sentence}".strip() if currentChunk else sentence if _estimateTokens(candidate) > maxTokens: if currentChunk.strip(): chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef}) currentChunk = sentence else: currentChunk = candidate if currentChunk.strip(): chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef}) return chunks class _ContextBuilder: """Assembles RAG context from multiple sources respecting a character budget.""" def __init__(self, budget: int): self._budget = budget self._sections: List[Dict[str, Any]] = [] def add( self, priority: int, label: str, items: List[Dict[str, Any]], isKeyValue: bool = False, maxChars: int = 0, ): self._sections.append({ "priority": priority, "label": label, "items": items, "isKeyValue": isKeyValue, "maxChars": maxChars, }) def build(self) -> str: self._sections.sort(key=lambda s: s["priority"]) parts = [] remaining = self._budget for section in self._sections: if remaining <= 0: break sectionCap = section.get("maxChars") or remaining sectionRemaining = min(sectionCap, remaining) header = f"### {section['label']}\n" sectionText = header sectionRemaining -= len(header) for item in section["items"]: if sectionRemaining <= 0: break if section["isKeyValue"]: line = f"- {item.get('key', '')}: {item.get('value', '')}\n" else: data = item.get("data", "") ref = item.get("contextRef", {}) refStr = f" [{ref}]" if ref else "" line = f"{data}{refStr}\n" if len(line) <= sectionRemaining: sectionText += line sectionRemaining -= len(line) consumed = min(sectionCap, remaining) - sectionRemaining remaining -= consumed parts.append(sectionText) return "\n".join(parts).strip()