# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Interface to the Knowledge Store database (poweron_knowledge). Provides CRUD for FileContentIndex, ContentChunk, WorkflowMemory and semantic search via pgvector. """ import logging from typing import Dict, Any, List, Optional from modules.connectors.connectorDbPostgre import _get_cached_connector from modules.datamodels.datamodelKnowledge import FileContentIndex, ContentChunk, WorkflowMemory from modules.datamodels.datamodelUam import User from modules.shared.configuration import APP_CONFIG from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) _instances: Dict[str, "KnowledgeObjects"] = {} class KnowledgeObjects: """Interface to the Knowledge Store database. Manages FileContentIndex, ContentChunk, and WorkflowMemory with semantic search.""" def __init__(self): self.currentUser: Optional[User] = None self.userId: Optional[str] = None self._initializeDatabase() def _initializeDatabase(self): dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") dbDatabase = "poweron_knowledge" dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) self.db = _get_cached_connector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId, ) logger.info("Knowledge Store database initialized") def setUserContext(self, user: User): self.currentUser = user self.userId = user.id if user else None if self.userId: self.db.updateContext(self.userId) # ========================================================================= # FileContentIndex CRUD # ========================================================================= def upsertFileContentIndex(self, index: FileContentIndex) -> Dict[str, Any]: """Create or update a FileContentIndex entry.""" data = index.model_dump() existing = self.db._loadRecord(FileContentIndex, index.id) if existing: return self.db.recordModify(FileContentIndex, index.id, data) return self.db.recordCreate(FileContentIndex, data) def getFileContentIndex(self, fileId: str) -> Optional[Dict[str, Any]]: """Get a FileContentIndex by file ID.""" return self.db._loadRecord(FileContentIndex, fileId) def getFileContentIndexByUser( self, userId: str, featureInstanceId: str = None ) -> List[Dict[str, Any]]: """Get all FileContentIndex entries for a user.""" recordFilter = {"userId": userId} if featureInstanceId: recordFilter["featureInstanceId"] = featureInstanceId return self.db.getRecordset(FileContentIndex, recordFilter=recordFilter) def updateFileStatus(self, fileId: str, status: str) -> bool: """Update the processing status of a FileContentIndex.""" existing = self.db._loadRecord(FileContentIndex, fileId) if not existing: return False self.db.recordModify(FileContentIndex, fileId, {"status": status}) return True def deleteFileContentIndex(self, fileId: str) -> bool: """Delete a FileContentIndex and all associated ContentChunks.""" chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fileId}) for chunk in chunks: self.db.recordDelete(ContentChunk, chunk["id"]) return self.db.recordDelete(FileContentIndex, fileId) # ========================================================================= # ContentChunk CRUD # ========================================================================= def upsertContentChunk(self, chunk: ContentChunk) -> Dict[str, Any]: """Create or update a ContentChunk.""" data = chunk.model_dump() existing = self.db._loadRecord(ContentChunk, chunk.id) if existing: return self.db.recordModify(ContentChunk, chunk.id, data) return self.db.recordCreate(ContentChunk, data) def upsertContentChunks(self, chunks: List[ContentChunk]) -> int: """Batch upsert multiple ContentChunks. Returns count of upserted chunks.""" count = 0 for chunk in chunks: self.upsertContentChunk(chunk) count += 1 return count def getContentChunks(self, fileId: str) -> List[Dict[str, Any]]: """Get all ContentChunks for a file.""" return self.db.getRecordset(ContentChunk, recordFilter={"fileId": fileId}) def deleteContentChunks(self, fileId: str) -> int: """Delete all ContentChunks for a file. Returns count of deleted chunks.""" chunks = self.db.getRecordset(ContentChunk, recordFilter={"fileId": fileId}) count = 0 for chunk in chunks: if self.db.recordDelete(ContentChunk, chunk["id"]): count += 1 return count # ========================================================================= # WorkflowMemory CRUD # ========================================================================= def upsertWorkflowMemory(self, memory: WorkflowMemory) -> Dict[str, Any]: """Create or update a WorkflowMemory entry.""" data = memory.model_dump() existing = self.db._loadRecord(WorkflowMemory, memory.id) if existing: return self.db.recordModify(WorkflowMemory, memory.id, data) return self.db.recordCreate(WorkflowMemory, data) def getWorkflowEntities(self, workflowId: str) -> List[Dict[str, Any]]: """Get all WorkflowMemory entries for a workflow.""" return self.db.getRecordset(WorkflowMemory, recordFilter={"workflowId": workflowId}) def getWorkflowEntity(self, workflowId: str, key: str) -> Optional[Dict[str, Any]]: """Get a specific WorkflowMemory entry by workflow and key.""" results = self.db.getRecordset( WorkflowMemory, recordFilter={"workflowId": workflowId, "key": key} ) return results[0] if results else None def deleteWorkflowMemory(self, workflowId: str) -> int: """Delete all WorkflowMemory entries for a workflow. Returns count.""" entries = self.db.getRecordset(WorkflowMemory, recordFilter={"workflowId": workflowId}) count = 0 for entry in entries: if self.db.recordDelete(WorkflowMemory, entry["id"]): count += 1 return count # ========================================================================= # Semantic Search # ========================================================================= def semanticSearch( self, queryVector: List[float], userId: str = None, featureInstanceId: str = None, mandateId: str = None, isShared: bool = None, limit: int = 10, minScore: float = None, contentType: str = None, ) -> List[Dict[str, Any]]: """Semantic search across ContentChunks using pgvector cosine similarity. Args: queryVector: Query embedding vector. userId: Filter by user (Instance Layer). featureInstanceId: Filter by feature instance. mandateId: Filter by mandate (for Shared Layer lookups). isShared: If True, search Shared Layer via FileContentIndex join. limit: Max results. minScore: Minimum cosine similarity (0.0 - 1.0). contentType: Filter by content type (text, image, etc.). Returns: List of ContentChunk records with _score field, sorted by relevance. """ recordFilter = {} if userId: recordFilter["userId"] = userId if featureInstanceId: recordFilter["featureInstanceId"] = featureInstanceId if contentType: recordFilter["contentType"] = contentType return self.db.semanticSearch( modelClass=ContentChunk, vectorColumn="embedding", queryVector=queryVector, limit=limit, recordFilter=recordFilter if recordFilter else None, minScore=minScore, ) def semanticSearchWorkflowMemory( self, queryVector: List[float], workflowId: str, limit: int = 5, minScore: float = None, ) -> List[Dict[str, Any]]: """Semantic search across WorkflowMemory entries.""" return self.db.semanticSearch( modelClass=WorkflowMemory, vectorColumn="embedding", queryVector=queryVector, limit=limit, recordFilter={"workflowId": workflowId}, minScore=minScore, ) def getInterface(currentUser: Optional[User] = None) -> KnowledgeObjects: """Get or create a KnowledgeObjects singleton.""" if "default" not in _instances: _instances["default"] = KnowledgeObjects() interface = _instances["default"] if currentUser: interface.setUserContext(currentUser) return interface