gateway/modules/interfaces/interfaceDbKnowledge.py
2026-03-15 23:38:21 +01:00

234 lines
9 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface to the Knowledge Store database (poweron_knowledge).
Provides CRUD for FileContentIndex, ContentChunk, WorkflowMemory
and semantic search via pgvector.
"""
import logging
from typing import Dict, Any, List, Optional
from modules.connectors.connectorDbPostgre import _get_cached_connector
from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk, WorkflowMemory
from modules.datamodels.datamodelUam import User
from modules.shared.configuration import APP_CONFIG
from modules.shared.timeUtils import getUtcTimestamp
logger = logging.getLogger(__name__)
_instances: Dict[str, "KnowledgeObjects"] = {}
class KnowledgeObjects:
"""Interface to the Knowledge Store database.
Manages FileContentIndex, ContentChunk, and WorkflowMemory with semantic search."""
def __init__(self):
self.currentUser: Optional[User] = None
self.userId: Optional[str] = None
self._initializeDatabase()
def _initializeDatabase(self):
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_knowledge"
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = _get_cached_connector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
logger.info("Knowledge Store database initialized")
def setUserContext(self, user: User):
self.currentUser = user
self.userId = user.id if user else None
if self.userId:
self.db.updateContext(self.userId)
# =========================================================================
# FileContentIndex CRUD
# =========================================================================
def upsertFileContentIndex(self, index: FileContentIndex) -> Dict[str, Any]:
"""Create or update a FileContentIndex entry."""
data = index.model_dump()
existing = self.db._loadRecord(FileContentIndex, index.id)
if existing:
return self.db.recordModify(FileContentIndex, index.id, data)
return self.db.recordCreate(FileContentIndex, data)
def getFileContentIndex(self, fileId: str) -> Optional[Dict[str, Any]]:
"""Get a FileContentIndex by file ID."""
return self.db._loadRecord(FileContentIndex, fileId)
def getFileContentIndexByUser(
self, userId: str, featureInstanceId: str = None
) -> List[Dict[str, Any]]:
"""Get all FileContentIndex entries for a user."""
recordFilter = {"userId": userId}
if featureInstanceId:
recordFilter["featureInstanceId"] = featureInstanceId
return self.db.getRecordset(FileContentIndex, recordFilter=recordFilter)
def updateFileStatus(self, fileId: str, status: str) -> bool:
"""Update the processing status of a FileContentIndex."""
existing = self.db._loadRecord(FileContentIndex, fileId)
if not existing:
return False
self.db.recordModify(FileContentIndex, fileId, {"status": status})
return True
def deleteFileContentIndex(self, fileId: str) -> bool:
"""Delete a FileContentIndex and all associated ContentChunks."""
chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fileId})
for chunk in chunks:
self.db.recordDelete(ContentChunk, chunk["id"])
return self.db.recordDelete(FileContentIndex, fileId)
# =========================================================================
# ContentChunk CRUD
# =========================================================================
def upsertContentChunk(self, chunk: ContentChunk) -> Dict[str, Any]:
"""Create or update a ContentChunk."""
data = chunk.model_dump()
existing = self.db._loadRecord(ContentChunk, chunk.id)
if existing:
return self.db.recordModify(ContentChunk, chunk.id, data)
return self.db.recordCreate(ContentChunk, data)
def upsertContentChunks(self, chunks: List[ContentChunk]) -> int:
"""Batch upsert multiple ContentChunks. Returns count of upserted chunks."""
count = 0
for chunk in chunks:
self.upsertContentChunk(chunk)
count += 1
return count
def getContentChunks(self, fileId: str) -> List[Dict[str, Any]]:
"""Get all ContentChunks for a file."""
return self.db.getRecordset(ContentChunk, recordFilter={"fileId": fileId})
def deleteContentChunks(self, fileId: str) -> int:
"""Delete all ContentChunks for a file. Returns count of deleted chunks."""
chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fileId})
count = 0
for chunk in chunks:
if self.db.recordDelete(ContentChunk, chunk["id"]):
count += 1
return count
# =========================================================================
# WorkflowMemory CRUD
# =========================================================================
def upsertWorkflowMemory(self, memory: WorkflowMemory) -> Dict[str, Any]:
"""Create or update a WorkflowMemory entry."""
data = memory.model_dump()
existing = self.db._loadRecord(WorkflowMemory, memory.id)
if existing:
return self.db.recordModify(WorkflowMemory, memory.id, data)
return self.db.recordCreate(WorkflowMemory, data)
def getWorkflowEntities(self, workflowId: str) -> List[Dict[str, Any]]:
"""Get all WorkflowMemory entries for a workflow."""
return self.db.getRecordset(WorkflowMemory, recordFilter={"workflowId": workflowId})
def getWorkflowEntity(self, workflowId: str, key: str) -> Optional[Dict[str, Any]]:
"""Get a specific WorkflowMemory entry by workflow and key."""
results = self.db.getRecordset(
WorkflowMemory, recordFilter={"workflowId": workflowId, "key": key}
)
return results[0] if results else None
def deleteWorkflowMemory(self, workflowId: str) -> int:
"""Delete all WorkflowMemory entries for a workflow. Returns count."""
entries = self.db.getRecordset(WorkflowMemory, recordFilter={"workflowId": workflowId})
count = 0
for entry in entries:
if self.db.recordDelete(WorkflowMemory, entry["id"]):
count += 1
return count
# =========================================================================
# Semantic Search
# =========================================================================
def semanticSearch(
self,
queryVector: List[float],
userId: str = None,
featureInstanceId: str = None,
mandateId: str = None,
isShared: bool = None,
limit: int = 10,
minScore: float = None,
contentType: str = None,
) -> List[Dict[str, Any]]:
"""Semantic search across ContentChunks using pgvector cosine similarity.
Args:
queryVector: Query embedding vector.
userId: Filter by user (Instance Layer).
featureInstanceId: Filter by feature instance.
mandateId: Filter by mandate (for Shared Layer lookups).
isShared: If True, search Shared Layer via FileContentIndex join.
limit: Max results.
minScore: Minimum cosine similarity (0.0 - 1.0).
contentType: Filter by content type (text, image, etc.).
Returns:
List of ContentChunk records with _score field, sorted by relevance.
"""
recordFilter = {}
if userId:
recordFilter["userId"] = userId
if featureInstanceId:
recordFilter["featureInstanceId"] = featureInstanceId
if contentType:
recordFilter["contentType"] = contentType
return self.db.semanticSearch(
modelClass=ContentChunk,
vectorColumn="embedding",
queryVector=queryVector,
limit=limit,
recordFilter=recordFilter if recordFilter else None,
minScore=minScore,
)
def semanticSearchWorkflowMemory(
self,
queryVector: List[float],
workflowId: str,
limit: int = 5,
minScore: float = None,
) -> List[Dict[str, Any]]:
"""Semantic search across WorkflowMemory entries."""
return self.db.semanticSearch(
modelClass=WorkflowMemory,
vectorColumn="embedding",
queryVector=queryVector,
limit=limit,
recordFilter={"workflowId": workflowId},
minScore=minScore,
)
def getInterface(currentUser: Optional[User] = None) -> KnowledgeObjects:
"""Get or create a KnowledgeObjects singleton."""
if "default" not in _instances:
_instances["default"] = KnowledgeObjects()
interface = _instances["default"]
if currentUser:
interface.setUserContext(currentUser)
return interface