# Copyright (c) 2026 PowerOn AG # All rights reserved. """ Interface to CommCoach database. Uses the PostgreSQL connector for data access with strict user ownership. """ import json import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.dbHelpers.dbRegistry import registerDatabase from modules.shared.timeUtils import getIsoTimestamp, getUtcTimestamp from modules.shared.configuration import APP_CONFIG from modules.shared.i18nRegistry import resolveText, t from .datamodelCommcoach import ( TrainingModule, TrainingModuleStatus, CoachingSession, CoachingSessionStatus, CoachingMessage, CoachingTask, CoachingTaskStatus, CoachingScore, CoachingUserProfile, CoachingPersona, ModulePersonaMapping, CoachingBadge, ) logger = logging.getLogger(__name__) commcoachDatabase = "poweron_commcoach" registerDatabase(commcoachDatabase) _interfaces = {} def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None): """Factory: get or create a CommcoachObjects interface instance.""" key = f"{currentUser.id}_{mandateId}_{featureInstanceId}" if key not in _interfaces: _interfaces[key] = CommcoachObjects(currentUser, mandateId, featureInstanceId) else: _interfaces[key].currentUser = currentUser _interfaces[key].mandateId = mandateId _interfaces[key].featureInstanceId = featureInstanceId return _interfaces[key] class CommcoachObjects: """Database interface for CommCoach feature. All reads enforce strict userId ownership.""" def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None): self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.userId = str(currentUser.id) if currentUser else "system" dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") dbDatabase = commcoachDatabase dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId, ) # ========================================================================= # Modules (formerly Contexts) # ========================================================================= def getModules(self, instanceId: str, userId: str, includeArchived: bool = False) -> List[Dict[str, Any]]: """Get all training modules for a user. Enriches with live sessionCount from sessions table.""" records = self.db.getRecordset( TrainingModule, recordFilter={"instanceId": instanceId, "userId": userId}, ) if not includeArchived: records = [r for r in records if r.get("status") != TrainingModuleStatus.ARCHIVED.value] allSessions = self.db.getRecordset( CoachingSession, recordFilter={"instanceId": instanceId, "userId": userId}, ) countByModule: Dict[str, int] = {} for s in allSessions: mid = s.get("moduleId") if mid: countByModule[mid] = countByModule.get(mid, 0) + 1 for r in records: r["sessionCount"] = countByModule.get(r.get("id", ""), 0) records.sort(key=lambda r: r.get("updatedAt") or r.get("createdAt") or "", reverse=True) return records def getModule(self, moduleId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(TrainingModule, recordFilter={"id": moduleId}) return records[0] if records else None def createModule(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(TrainingModule, data) def updateModule(self, moduleId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(TrainingModule, moduleId, updates) def deleteModule(self, moduleId: str) -> bool: self._deleteSessionsByModule(moduleId) self._deleteTasksByModule(moduleId) self._deleteScoresByModule(moduleId) return self.db.recordDelete(TrainingModule, moduleId) # ========================================================================= # Sessions # ========================================================================= def getSessions(self, moduleId: str, userId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset( CoachingSession, recordFilter={"moduleId": moduleId, "userId": userId}, ) records.sort(key=lambda r: r.get("startedAt") or 0, reverse=True) return records def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(CoachingSession, recordFilter={"id": sessionId}) return records[0] if records else None def getActiveSession(self, moduleId: str, userId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset( CoachingSession, recordFilter={"moduleId": moduleId, "userId": userId, "status": CoachingSessionStatus.ACTIVE.value}, ) return records[0] if records else None def createSession(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() data["startedAt"] = getUtcTimestamp() return self.db.recordCreate(CoachingSession, data) def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingSession, sessionId, updates) def _deleteSessionsByModule(self, moduleId: str) -> int: records = self.db.getRecordset(CoachingSession, recordFilter={"moduleId": moduleId}) count = 0 for record in records: self._deleteMessagesBySession(record.get("id")) self.db.recordDelete(CoachingSession, record.get("id")) count += 1 return count # ========================================================================= # Messages # ========================================================================= def getMessages(self, sessionId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId}) records.sort(key=lambda r: r.get("createdAt") or "") return records def getRecentMessages(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]: records = self.getMessages(sessionId) return records[-count:] def createMessage(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingMessage, data) def _deleteMessagesBySession(self, sessionId: str) -> int: records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId}) count = 0 for record in records: self.db.recordDelete(CoachingMessage, record.get("id")) count += 1 return count # ========================================================================= # Tasks # ========================================================================= def getTasks(self, moduleId: str, userId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset( CoachingTask, recordFilter={"moduleId": moduleId, "userId": userId}, ) records.sort(key=lambda r: r.get("createdAt") or "", reverse=True) return records def getTask(self, taskId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(CoachingTask, recordFilter={"id": taskId}) return records[0] if records else None def createTask(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingTask, data) def updateTask(self, taskId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingTask, taskId, updates) def deleteTask(self, taskId: str) -> bool: return self.db.recordDelete(CoachingTask, taskId) def _deleteTasksByModule(self, moduleId: str) -> int: records = self.db.getRecordset(CoachingTask, recordFilter={"moduleId": moduleId}) count = 0 for record in records: self.db.recordDelete(CoachingTask, record.get("id")) count += 1 return count def getOpenTaskCount(self, userId: str, instanceId: str) -> int: records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId}) return len([r for r in records if r.get("status") in (CoachingTaskStatus.OPEN.value, CoachingTaskStatus.IN_PROGRESS.value)]) def getCompletedTaskCount(self, userId: str, instanceId: str) -> int: records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId}) return len([r for r in records if r.get("status") == CoachingTaskStatus.DONE.value]) # ========================================================================= # Scores # ========================================================================= def getScores(self, moduleId: str, userId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset( CoachingScore, recordFilter={"moduleId": moduleId, "userId": userId}, ) records.sort(key=lambda r: r.get("createdAt") or "") return records def getRecentScores(self, userId: str, limit: int = 20) -> List[Dict[str, Any]]: records = self.db.getRecordset(CoachingScore, recordFilter={"userId": userId}) records.sort(key=lambda r: r.get("createdAt") or "", reverse=True) return records[:limit] def createScore(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingScore, data) def _deleteScoresByModule(self, moduleId: str) -> int: records = self.db.getRecordset(CoachingScore, recordFilter={"moduleId": moduleId}) count = 0 for record in records: self.db.recordDelete(CoachingScore, record.get("id")) count += 1 return count # ========================================================================= # Personas # ========================================================================= def getPersonas(self, userId: str, instanceId: str) -> List[Dict[str, Any]]: builtins = self.db.getRecordset(CoachingPersona, recordFilter={"userId": "system"}) custom = self.db.getRecordset(CoachingPersona, recordFilter={"userId": userId, "instanceId": instanceId}) all = builtins + custom return [p for p in all if p.get("isActive", True)] def getPersona(self, personaId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(CoachingPersona, recordFilter={"id": personaId}) return records[0] if records else None def createPersona(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingPersona, data) def updatePersona(self, personaId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingPersona, personaId, updates) def deletePersona(self, personaId: str) -> bool: return self.db.recordDelete(CoachingPersona, personaId) def getAllPersonas(self, instanceId: str) -> List[Dict[str, Any]]: """All personas (builtin + custom for this instance), including inactive.""" builtins = self.db.getRecordset(CoachingPersona, recordFilter={"userId": "system"}) custom = self.db.getRecordset(CoachingPersona, recordFilter={"instanceId": instanceId}) custom = [p for p in custom if p.get("userId") != "system"] return builtins + custom # ========================================================================= # Module-Persona Mapping # ========================================================================= def getModulePersonas(self, moduleId: str) -> List[Dict[str, Any]]: return self.db.getRecordset(ModulePersonaMapping, recordFilter={"moduleId": moduleId}) def setModulePersonas(self, moduleId: str, personaIds: List[str], instanceId: str) -> List[Dict[str, Any]]: existing = self.db.getRecordset(ModulePersonaMapping, recordFilter={"moduleId": moduleId}) for rec in existing: self.db.recordDelete(ModulePersonaMapping, rec["id"]) created = [] for pId in personaIds: data = ModulePersonaMapping( moduleId=moduleId, personaId=pId, instanceId=instanceId, ).model_dump() data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() created.append(self.db.recordCreate(ModulePersonaMapping, data)) return created # ========================================================================= # Badges # ========================================================================= def getBadges(self, userId: str, instanceId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset(CoachingBadge, recordFilter={"userId": userId, "instanceId": instanceId}) records.sort(key=lambda r: r.get("awardedAt") or 0, reverse=True) return records def hasBadge(self, userId: str, instanceId: str, badgeKey: str) -> bool: records = self.db.getRecordset(CoachingBadge, recordFilter={"userId": userId, "instanceId": instanceId, "badgeKey": badgeKey}) return len(records) > 0 def awardBadge(self, data: Dict[str, Any]) -> Dict[str, Any]: data["awardedAt"] = getUtcTimestamp() data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingBadge, data) # ========================================================================= # Score History # ========================================================================= def getScoreHistory(self, moduleId: str, userId: str) -> Dict[str, List[Dict[str, Any]]]: scores = self.getScores(moduleId, userId) history: Dict[str, List[Dict[str, Any]]] = {} for s in scores: dim = s.get("dimension", "unknown") if dim not in history: history[dim] = [] history[dim].append({"score": s.get("score"), "trend": s.get("trend"), "evidence": s.get("evidence"), "createdAt": s.get("createdAt"), "sessionId": s.get("sessionId")}) for dim in history: history[dim].sort(key=lambda x: x.get("createdAt") or "") return history # ========================================================================= # User Profile # ========================================================================= def getProfile(self, userId: str, instanceId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset( CoachingUserProfile, recordFilter={"userId": userId, "instanceId": instanceId}, ) return records[0] if records else None def getOrCreateProfile(self, userId: str, mandateId: str, instanceId: str) -> Dict[str, Any]: existing = self.getProfile(userId, instanceId) if existing: return existing data = CoachingUserProfile( userId=userId, mandateId=mandateId, instanceId=instanceId, ).model_dump() data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingUserProfile, data) def updateProfile(self, profileId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingUserProfile, profileId, updates) # ========================================================================= # Dashboard Aggregation # ========================================================================= def getDashboardData(self, userId: str, instanceId: str) -> Dict[str, Any]: modules = self.db.getRecordset(TrainingModule, recordFilter={"userId": userId, "instanceId": instanceId}) sessions = self.db.getRecordset(CoachingSession, recordFilter={"userId": userId, "instanceId": instanceId}) profile = self.getProfile(userId, instanceId) activeModules = [m for m in modules if m.get("status") == TrainingModuleStatus.ACTIVE.value] totalMinutes = sum(s.get("durationSeconds", 0) for s in sessions) // 60 scores = [] for s in sessions: raw = s.get("competenceScore") if raw is not None: try: scores.append(float(raw)) except (ValueError, TypeError): pass avgScore = sum(scores) / len(scores) if scores else None recentScores = self.getRecentScores(userId, limit=10) countByModule: Dict[str, int] = {} for s in sessions: mid = s.get("moduleId") if mid: countByModule[mid] = countByModule.get(mid, 0) + 1 moduleSummaries = [] for mod in activeModules: modId = mod.get("id", "") moduleSummaries.append({ "id": modId, "title": mod.get("title"), "moduleType": mod.get("moduleType"), "sessionCount": countByModule.get(modId, 0), "lastSessionAt": mod.get("lastSessionAt"), }) return { "totalModules": len(modules), "activeModules": len(activeModules), "totalSessions": len(sessions), "totalMinutes": totalMinutes, "streakDays": profile.get("streakDays", 0) if profile else 0, "longestStreak": profile.get("longestStreak", 0) if profile else 0, "averageScore": round(avgScore, 1) if avgScore else None, "recentScores": recentScores, "openTasks": self.getOpenTaskCount(userId, instanceId), "completedTasks": self.getCompletedTaskCount(userId, instanceId), "modules": moduleSummaries, "badges": self.getBadges(userId, instanceId), "level": _calcLevel(len(sessions)), } _LEVELS = [ (50, 5, "master", "Meister"), (25, 4, "expert", "Experte"), (10, 3, "advanced", "Fortgeschritten"), (3, 2, "engaged", "Engagiert"), ] t("Meister") t("Experte") t("Fortgeschritten") t("Engagiert") t("Einsteiger") def _calcLevel(totalSessions: int) -> Dict[str, Any]: for threshold, number, code, labelKey in _LEVELS: if totalSessions >= threshold: return {"number": number, "code": code, "label": resolveText(labelKey), "totalSessions": totalSessions} return {"number": 1, "code": "beginner", "label": resolveText("Einsteiger"), "totalSessions": totalSessions}