gateway/modules/serviceCenter/services/serviceKnowledge/mainServiceKnowledge.py

563 lines
21 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""Knowledge service: 3-tier RAG with indexing, semantic search, and context building."""
import logging
import re
from typing import Any, Callable, Dict, List, Optional
from modules.datamodels.datamodelKnowledge import (
FileContentIndex, ContentChunk, WorkflowMemory,
)
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
from modules.interfaces.interfaceDbKnowledge import getInterface as getKnowledgeInterface
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
CHARS_PER_TOKEN = 4
DEFAULT_CHUNK_TOKENS = 400
DEFAULT_CONTEXT_BUDGET = 8000
class KnowledgeService:
"""Service for Knowledge Store operations: indexing, retrieval, and context building."""
def __init__(self, context, get_service: Callable[[str], Any]):
self._context = context
self._getService = get_service
self._knowledgeDb = getKnowledgeInterface(context.user)
# =========================================================================
# Embedding helper
# =========================================================================
async def _embed(self, texts: List[str]) -> List[List[float]]:
"""Embed texts via AiService (respects allowedProviders)."""
aiService = self._getService("ai")
response = await aiService.callEmbedding(texts)
if response.errorCount > 0:
logger.error(f"Embedding failed: {response.content}")
return []
return (response.metadata or {}).get("embeddings", [])
async def _embedSingle(self, text: str) -> List[float]:
"""Embed a single text. Returns empty list on failure."""
results = await self._embed([text])
return results[0] if results else []
# =========================================================================
# File Indexing (called after extraction, before embedding)
# =========================================================================
async def indexFile(
self,
fileId: str,
fileName: str,
mimeType: str,
userId: str,
featureInstanceId: str = "",
mandateId: str = "",
contentObjects: List[Dict[str, Any]] = None,
structure: Dict[str, Any] = None,
containerPath: str = None,
) -> FileContentIndex:
"""Index a file's content objects and create embeddings for text chunks.
This is the main entry point after non-AI extraction has produced content objects.
Args:
fileId: The file ID.
fileName: Original file name.
mimeType: MIME type.
userId: Owner user.
featureInstanceId: Feature instance scope.
mandateId: Mandate scope.
contentObjects: List of extracted content objects, each with keys:
contentType (str), data (str), contextRef (dict), contentObjectId (str).
structure: Structural overview of the file.
containerPath: Path within container if applicable.
Returns:
The created FileContentIndex.
"""
contentObjects = contentObjects or []
# 1. Create FileContentIndex
index = FileContentIndex(
id=fileId,
userId=userId,
featureInstanceId=featureInstanceId,
mandateId=mandateId,
fileName=fileName,
mimeType=mimeType,
containerPath=containerPath,
totalObjects=len(contentObjects),
totalSize=sum(len(obj.get("data", "").encode("utf-8")) for obj in contentObjects),
structure=structure or {},
objectSummary=[
{
"id": obj.get("contentObjectId", ""),
"type": obj.get("contentType", "other"),
"size": len(obj.get("data", "").encode("utf-8")),
"ref": obj.get("contextRef", {}),
}
for obj in contentObjects
],
status="extracted",
)
self._knowledgeDb.upsertFileContentIndex(index)
# 2. Chunk text content objects and create embeddings
textObjects = [o for o in contentObjects if o.get("contentType") == "text"]
if textObjects:
self._knowledgeDb.updateFileStatus(fileId, "embedding")
chunks = _chunkForEmbedding(textObjects, maxTokens=DEFAULT_CHUNK_TOKENS)
texts = [c["data"] for c in chunks]
totalChars = sum(len(t) for t in texts)
estTokens = totalChars // CHARS_PER_TOKEN
logger.info(
f"Embedding file {fileId}: {len(textObjects)} text objects -> "
f"{len(chunks)} chunks, ~{estTokens} tokens total"
)
embeddings = await self._embed(texts) if texts else []
for i, chunk in enumerate(chunks):
embedding = embeddings[i] if i < len(embeddings) else None
contentChunk = ContentChunk(
contentObjectId=chunk["contentObjectId"],
fileId=fileId,
userId=userId,
featureInstanceId=featureInstanceId,
contentType="text",
data=chunk["data"],
contextRef=chunk["contextRef"],
embedding=embedding,
)
self._knowledgeDb.upsertContentChunk(contentChunk)
# 3. Store non-text content objects (images, etc.) without embedding
nonTextObjects = [o for o in contentObjects if o.get("contentType") != "text"]
for obj in nonTextObjects:
contentChunk = ContentChunk(
contentObjectId=obj.get("contentObjectId", ""),
fileId=fileId,
userId=userId,
featureInstanceId=featureInstanceId,
contentType=obj.get("contentType", "other"),
data=obj.get("data", ""),
contextRef=obj.get("contextRef", {}),
embedding=None,
)
self._knowledgeDb.upsertContentChunk(contentChunk)
self._knowledgeDb.updateFileStatus(fileId, "indexed")
index.status = "indexed"
logger.info(f"Indexed file {fileId} ({fileName}): {len(contentObjects)} objects, {len(textObjects)} text chunks")
return index
# =========================================================================
# RAG Context Building (3-tier search)
# =========================================================================
async def buildAgentContext(
self,
currentPrompt: str,
workflowId: str,
userId: str,
featureInstanceId: str = "",
mandateId: str = "",
contextBudget: int = DEFAULT_CONTEXT_BUDGET,
) -> str:
"""Build RAG context for an agent round by searching all 3 layers.
Args:
currentPrompt: The current user prompt to find relevant context for.
workflowId: Current workflow ID.
userId: Current user.
featureInstanceId: Feature instance scope.
mandateId: Mandate scope.
contextBudget: Maximum characters for the context string.
Returns:
Formatted context string for injection into the agent's system prompt.
"""
queryVector = await self._embedSingle(currentPrompt)
if not queryVector:
return ""
builder = _ContextBuilder(budget=contextBudget)
# Layer 1: Instance Layer (user's own documents, highest priority)
instanceChunks = self._knowledgeDb.semanticSearch(
queryVector=queryVector,
userId=userId,
featureInstanceId=featureInstanceId,
limit=15,
minScore=0.65,
)
if instanceChunks:
builder.add(priority=1, label="Relevant Documents", items=instanceChunks)
# Layer 2: Workflow Layer (current workflow entities & memory)
entities = self._knowledgeDb.getWorkflowEntities(workflowId)
if entities:
builder.add(priority=2, label="Workflow Context", items=entities, isKeyValue=True)
# Layer 3: Shared Layer (mandate-wide shared documents)
sharedChunks = self._knowledgeDb.semanticSearch(
queryVector=queryVector,
mandateId=mandateId,
isShared=True,
limit=10,
minScore=0.7,
)
if sharedChunks:
builder.add(priority=3, label="Shared Knowledge", items=sharedChunks)
return builder.build()
# =========================================================================
# Workflow Memory
# =========================================================================
async def storeEntity(
self,
workflowId: str,
userId: str,
featureInstanceId: str,
key: str,
value: str,
source: str = "extraction",
) -> WorkflowMemory:
"""Store a key-value entity in workflow memory with optional embedding."""
embedding = await self._embedSingle(f"{key}: {value}")
memory = WorkflowMemory(
workflowId=workflowId,
userId=userId,
featureInstanceId=featureInstanceId,
key=key,
value=value,
source=source,
embedding=embedding if embedding else None,
)
self._knowledgeDb.upsertWorkflowMemory(memory)
return memory
def getEntities(self, workflowId: str) -> List[Dict[str, Any]]:
"""Get all entities for a workflow."""
return self._knowledgeDb.getWorkflowEntities(workflowId)
# =========================================================================
# File Status
# =========================================================================
def getFileStatus(self, fileId: str) -> Optional[str]:
"""Get the indexing status of a file."""
index = self._knowledgeDb.getFileContentIndex(fileId)
return index.get("status") if index else None
def isFileIndexed(self, fileId: str) -> bool:
"""Check if a file has been fully indexed."""
return self.getFileStatus(fileId) == "indexed"
# =========================================================================
# On-Demand Extraction (Smart Document Handling)
# =========================================================================
async def readSection(self, fileId: str, sectionId: str) -> List[Dict[str, Any]]:
"""Read content objects for a specific section. Uses cache if available.
Args:
fileId: Source file ID.
sectionId: Section identifier from the FileContentIndex structure.
Returns:
List of content object dicts with data and contextRef.
"""
cached = self._knowledgeDb.getContentChunks(fileId)
sectionChunks = [
c for c in (cached or [])
if (c.get("contextRef", {}).get("sectionId") == sectionId)
]
if sectionChunks:
return sectionChunks
index = self._knowledgeDb.getFileContentIndex(fileId)
if not index:
return []
structure = index.get("structure", {}) if isinstance(index, dict) else getattr(index, "structure", {})
sections = structure.get("sections", [])
section = next((s for s in sections if s.get("id") == sectionId), None)
if not section:
return []
startPage = section.get("startPage", 0)
endPage = section.get("endPage", startPage)
return await self._extractPagesOnDemand(fileId, startPage, endPage, sectionId)
async def readContentObjects(
self, fileId: str, filter: Dict[str, Any] = None
) -> List[Dict[str, Any]]:
"""Read content objects with optional filters (pageIndex, contentType, sectionId).
Args:
fileId: Source file ID.
filter: Optional dict with keys pageIndex (list[int]), contentType (str), sectionId (str).
Returns:
Filtered list of content chunk dicts.
"""
filter = filter or {}
chunks = self._knowledgeDb.getContentChunks(fileId) or []
if "pageIndex" in filter:
targetPages = filter["pageIndex"]
if isinstance(targetPages, int):
targetPages = [targetPages]
chunks = [
c for c in chunks
if c.get("contextRef", {}).get("pageIndex") in targetPages
]
if "contentType" in filter:
chunks = [c for c in chunks if c.get("contentType") == filter["contentType"]]
if "sectionId" in filter:
chunks = [
c for c in chunks
if c.get("contextRef", {}).get("sectionId") == filter["sectionId"]
]
return chunks
async def extractContainerItem(
self, fileId: str, containerPath: str
) -> Optional[Dict[str, Any]]:
"""On-demand extraction of a specific item within a container.
If the item is already indexed, returns existing data.
Otherwise triggers extraction and indexing.
Args:
fileId: The container file ID.
containerPath: Path within the container (e.g. "folder/report.pdf").
Returns:
FileContentIndex dict for the extracted item, or None.
"""
existing = self._knowledgeDb.getFileContentIndex(fileId)
if existing:
existingPath = existing.get("containerPath") if isinstance(existing, dict) else getattr(existing, "containerPath", None)
if existingPath == containerPath:
return existing
logger.info(f"On-demand extraction for {containerPath} in file {fileId}")
return None
async def _extractPagesOnDemand(
self, fileId: str, startPage: int, endPage: int, sectionId: str
) -> List[Dict[str, Any]]:
"""Extract specific pages from a file and cache in knowledge store."""
try:
chatService = self._getService("chat")
fileContent = chatService.getFileContent(fileId)
if not fileContent:
return []
fileData = fileContent.get("data", b"")
mimeType = fileContent.get("mimeType", "")
fileName = fileContent.get("fileName", "")
if isinstance(fileData, str):
import base64
fileData = base64.b64decode(fileData)
if mimeType != "application/pdf":
return []
try:
import fitz
except ImportError:
return []
doc = fitz.open(stream=fileData, filetype="pdf")
results = []
for pageIdx in range(startPage, min(endPage + 1, len(doc))):
page = doc[pageIdx]
text = page.get_text() or ""
if not text.strip():
continue
chunk = ContentChunk(
contentObjectId=f"page-{pageIdx}",
fileId=fileId,
userId=self._context.user.id if self._context.user else "",
featureInstanceId=self._context.feature_instance_id or "",
contentType="text",
data=text,
contextRef={
"containerPath": fileName,
"location": f"page:{pageIdx+1}",
"pageIndex": pageIdx,
"sectionId": sectionId,
},
)
embedding = await self._embedSingle(text[:2000])
if embedding:
chunk.embedding = embedding
self._knowledgeDb.upsertContentChunk(chunk)
results.append(chunk.model_dump())
doc.close()
return results
except Exception as e:
logger.error(f"On-demand page extraction failed: {e}")
return []
def getFileContentIndex(self, fileId: str) -> Optional[Dict[str, Any]]:
"""Get the FileContentIndex for a file."""
return self._knowledgeDb.getFileContentIndex(fileId)
# =============================================================================
# Internal helpers
# =============================================================================
def _estimateTokens(text: str) -> int:
"""Estimate token count using character-based heuristic (1 token ~ 4 chars)."""
return max(1, len(text) // CHARS_PER_TOKEN)
def _splitSentences(text: str) -> List[str]:
"""Split text into sentences at common boundaries (.!?) followed by whitespace."""
parts = re.split(r'(?<=[.!?])\s+', text.replace("\n", " ").strip())
return [p for p in parts if p.strip()]
def _hardSplitByTokens(text: str, maxTokens: int) -> List[str]:
"""Force-split text into pieces that each fit within maxTokens.
Used as safety net when sentence splitting produces oversized segments.
Splits at word boundaries where possible.
"""
maxChars = maxTokens * CHARS_PER_TOKEN
pieces = []
while len(text) > maxChars:
splitAt = text.rfind(" ", 0, maxChars)
if splitAt <= 0:
splitAt = maxChars
pieces.append(text[:splitAt].strip())
text = text[splitAt:].strip()
if text:
pieces.append(text)
return pieces
def _chunkForEmbedding(
textObjects: List[Dict[str, Any]], maxTokens: int = DEFAULT_CHUNK_TOKENS
) -> List[Dict[str, Any]]:
"""Split text content objects into token-aware chunks suitable for embedding.
Each chunk preserves the contextRef from its source object.
Splits at sentence boundaries; applies hard-cap if a single sentence exceeds maxTokens.
"""
chunks = []
for obj in textObjects:
text = (obj.get("data", "") or "").strip()
if not text:
continue
contentObjectId = obj.get("contentObjectId", "")
contextRef = obj.get("contextRef", {})
if _estimateTokens(text) <= maxTokens:
chunks.append({"data": text, "contentObjectId": contentObjectId, "contextRef": contextRef})
continue
sentences = _splitSentences(text)
currentChunk = ""
for sentence in sentences:
if _estimateTokens(sentence) > maxTokens:
if currentChunk.strip():
chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef})
currentChunk = ""
for piece in _hardSplitByTokens(sentence, maxTokens):
chunks.append({"data": piece, "contentObjectId": contentObjectId, "contextRef": contextRef})
continue
candidate = f"{currentChunk} {sentence}".strip() if currentChunk else sentence
if _estimateTokens(candidate) > maxTokens:
if currentChunk.strip():
chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef})
currentChunk = sentence
else:
currentChunk = candidate
if currentChunk.strip():
chunks.append({"data": currentChunk.strip(), "contentObjectId": contentObjectId, "contextRef": contextRef})
return chunks
class _ContextBuilder:
"""Assembles RAG context from multiple sources respecting a character budget."""
def __init__(self, budget: int):
self._budget = budget
self._sections: List[Dict[str, Any]] = []
def add(
self,
priority: int,
label: str,
items: List[Dict[str, Any]],
isKeyValue: bool = False,
):
self._sections.append({
"priority": priority,
"label": label,
"items": items,
"isKeyValue": isKeyValue,
})
def build(self) -> str:
self._sections.sort(key=lambda s: s["priority"])
parts = []
remaining = self._budget
for section in self._sections:
if remaining <= 0:
break
header = f"### {section['label']}\n"
sectionText = header
remaining -= len(header)
for item in section["items"]:
if remaining <= 0:
break
if section["isKeyValue"]:
line = f"- {item.get('key', '')}: {item.get('value', '')}\n"
else:
data = item.get("data", "")
ref = item.get("contextRef", {})
score = item.get("_score", "")
refStr = f" [{ref}]" if ref else ""
line = f"{data}{refStr}\n"
if len(line) <= remaining:
sectionText += line
remaining -= len(line)
parts.append(sectionText)
return "\n".join(parts).strip()