# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Interface to CommCoach database. Uses the PostgreSQL connector for data access with strict user ownership. """ import json import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.dbRegistry import registerDatabase from modules.shared.timeUtils import getIsoTimestamp, getUtcTimestamp from modules.shared.configuration import APP_CONFIG from modules.shared.i18nRegistry import resolveText, t from .datamodelCommcoach import ( TrainingModule, TrainingModuleStatus, CoachingSession, CoachingSessionStatus, CoachingMessage, CoachingTask, CoachingTaskStatus, CoachingScore, CoachingUserProfile, ) logger = logging.getLogger(__name__) commcoachDatabase = "poweron_commcoach" registerDatabase(commcoachDatabase) _interfaces = {} def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None): """Factory: get or create a CommcoachObjects interface instance.""" key = f"{currentUser.id}_{mandateId}_{featureInstanceId}" if key not in _interfaces: _interfaces[key] = CommcoachObjects(currentUser, mandateId, featureInstanceId) else: _interfaces[key].currentUser = currentUser _interfaces[key].mandateId = mandateId _interfaces[key].featureInstanceId = featureInstanceId return _interfaces[key] class CommcoachObjects: """Database interface for CommCoach feature. All reads enforce strict userId ownership.""" def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None): self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.userId = str(currentUser.id) if currentUser else "system" dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") dbDatabase = commcoachDatabase dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId, ) # ========================================================================= # Modules (formerly Contexts) # ========================================================================= def getModules(self, instanceId: str, userId: str, includeArchived: bool = False) -> List[Dict[str, Any]]: """Get all training modules for a user. Enriches with live sessionCount from sessions table.""" records = self.db.getRecordset( TrainingModule, recordFilter={"instanceId": instanceId, "userId": userId}, ) if not includeArchived: records = [r for r in records if r.get("status") != TrainingModuleStatus.ARCHIVED.value] allSessions = self.db.getRecordset( CoachingSession, recordFilter={"instanceId": instanceId, "userId": userId}, ) countByModule: Dict[str, int] = {} for s in allSessions: mid = s.get("moduleId") if mid: countByModule[mid] = countByModule.get(mid, 0) + 1 for r in records: r["sessionCount"] = countByModule.get(r.get("id", ""), 0) records.sort(key=lambda r: r.get("updatedAt") or r.get("createdAt") or "", reverse=True) return records def getModule(self, moduleId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(TrainingModule, recordFilter={"id": moduleId}) return records[0] if records else None def createModule(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(TrainingModule, data) def updateModule(self, moduleId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(TrainingModule, moduleId, updates) def deleteModule(self, moduleId: str) -> bool: self._deleteSessionsByModule(moduleId) self._deleteTasksByModule(moduleId) self._deleteScoresByModule(moduleId) return self.db.recordDelete(TrainingModule, moduleId) # ========================================================================= # Sessions # ========================================================================= def getSessions(self, moduleId: str, userId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset( CoachingSession, recordFilter={"moduleId": moduleId, "userId": userId}, ) records.sort(key=lambda r: r.get("startedAt") or 0, reverse=True) return records def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(CoachingSession, recordFilter={"id": sessionId}) return records[0] if records else None def getActiveSession(self, moduleId: str, userId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset( CoachingSession, recordFilter={"moduleId": moduleId, "userId": userId, "status": CoachingSessionStatus.ACTIVE.value}, ) return records[0] if records else None def createSession(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() data["startedAt"] = getUtcTimestamp() return self.db.recordCreate(CoachingSession, data) def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingSession, sessionId, updates) def _deleteSessionsByModule(self, moduleId: str) -> int: records = self.db.getRecordset(CoachingSession, recordFilter={"moduleId": moduleId}) count = 0 for record in records: self._deleteMessagesBySession(record.get("id")) self.db.recordDelete(CoachingSession, record.get("id")) count += 1 return count # ========================================================================= # Messages # ========================================================================= def getMessages(self, sessionId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId}) records.sort(key=lambda r: r.get("createdAt") or "") return records def getRecentMessages(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]: records = self.getMessages(sessionId) return records[-count:] def createMessage(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingMessage, data) def _deleteMessagesBySession(self, sessionId: str) -> int: records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId}) count = 0 for record in records: self.db.recordDelete(CoachingMessage, record.get("id")) count += 1 return count # ========================================================================= # Tasks # ========================================================================= def getTasks(self, moduleId: str, userId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset( CoachingTask, recordFilter={"moduleId": moduleId, "userId": userId}, ) records.sort(key=lambda r: r.get("createdAt") or "", reverse=True) return records def getTask(self, taskId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset(CoachingTask, recordFilter={"id": taskId}) return records[0] if records else None def createTask(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingTask, data) def updateTask(self, taskId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingTask, taskId, updates) def deleteTask(self, taskId: str) -> bool: return self.db.recordDelete(CoachingTask, taskId) def _deleteTasksByModule(self, moduleId: str) -> int: records = self.db.getRecordset(CoachingTask, recordFilter={"moduleId": moduleId}) count = 0 for record in records: self.db.recordDelete(CoachingTask, record.get("id")) count += 1 return count def getOpenTaskCount(self, userId: str, instanceId: str) -> int: records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId}) return len([r for r in records if r.get("status") in (CoachingTaskStatus.OPEN.value, CoachingTaskStatus.IN_PROGRESS.value)]) def getCompletedTaskCount(self, userId: str, instanceId: str) -> int: records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId}) return len([r for r in records if r.get("status") == CoachingTaskStatus.DONE.value]) # ========================================================================= # Scores # ========================================================================= def getScores(self, moduleId: str, userId: str) -> List[Dict[str, Any]]: records = self.db.getRecordset( CoachingScore, recordFilter={"moduleId": moduleId, "userId": userId}, ) records.sort(key=lambda r: r.get("createdAt") or "") return records def getRecentScores(self, userId: str, limit: int = 20) -> List[Dict[str, Any]]: records = self.db.getRecordset(CoachingScore, recordFilter={"userId": userId}) records.sort(key=lambda r: r.get("createdAt") or "", reverse=True) return records[:limit] def createScore(self, data: Dict[str, Any]) -> Dict[str, Any]: data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingScore, data) def _deleteScoresByModule(self, moduleId: str) -> int: records = self.db.getRecordset(CoachingScore, recordFilter={"moduleId": moduleId}) count = 0 for record in records: self.db.recordDelete(CoachingScore, record.get("id")) count += 1 return count # ========================================================================= # Personas # ========================================================================= def getPersonas(self, userId: str, instanceId: str) -> List[Dict[str, Any]]: from .datamodelCommcoach import CoachingPersona builtins = self.db.getRecordset(CoachingPersona, recordFilter={"userId": "system"}) custom = self.db.getRecordset(CoachingPersona, recordFilter={"userId": userId, "instanceId": instanceId}) all = builtins + custom return [p for p in all if p.get("isActive", True)] def getPersona(self, personaId: str) -> Optional[Dict[str, Any]]: from .datamodelCommcoach import CoachingPersona records = self.db.getRecordset(CoachingPersona, recordFilter={"id": personaId}) return records[0] if records else None def createPersona(self, data: Dict[str, Any]) -> Dict[str, Any]: from .datamodelCommcoach import CoachingPersona data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingPersona, data) def updatePersona(self, personaId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: from .datamodelCommcoach import CoachingPersona updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingPersona, personaId, updates) def deletePersona(self, personaId: str) -> bool: from .datamodelCommcoach import CoachingPersona return self.db.recordDelete(CoachingPersona, personaId) def getAllPersonas(self, instanceId: str) -> List[Dict[str, Any]]: """All personas (builtin + custom for this instance), including inactive.""" from .datamodelCommcoach import CoachingPersona builtins = self.db.getRecordset(CoachingPersona, recordFilter={"userId": "system"}) custom = self.db.getRecordset(CoachingPersona, recordFilter={"instanceId": instanceId}) custom = [p for p in custom if p.get("userId") != "system"] return builtins + custom # ========================================================================= # Module-Persona Mapping # ========================================================================= def getModulePersonas(self, moduleId: str) -> List[Dict[str, Any]]: from .datamodelCommcoach import ModulePersonaMapping return self.db.getRecordset(ModulePersonaMapping, recordFilter={"moduleId": moduleId}) def setModulePersonas(self, moduleId: str, personaIds: List[str], instanceId: str) -> List[Dict[str, Any]]: from .datamodelCommcoach import ModulePersonaMapping existing = self.db.getRecordset(ModulePersonaMapping, recordFilter={"moduleId": moduleId}) for rec in existing: self.db.recordDelete(ModulePersonaMapping, rec["id"]) created = [] for pId in personaIds: data = ModulePersonaMapping( moduleId=moduleId, personaId=pId, instanceId=instanceId, ).model_dump() data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() created.append(self.db.recordCreate(ModulePersonaMapping, data)) return created # ========================================================================= # Badges # ========================================================================= def getBadges(self, userId: str, instanceId: str) -> List[Dict[str, Any]]: from .datamodelCommcoach import CoachingBadge records = self.db.getRecordset(CoachingBadge, recordFilter={"userId": userId, "instanceId": instanceId}) records.sort(key=lambda r: r.get("awardedAt") or 0, reverse=True) return records def hasBadge(self, userId: str, instanceId: str, badgeKey: str) -> bool: from .datamodelCommcoach import CoachingBadge records = self.db.getRecordset(CoachingBadge, recordFilter={"userId": userId, "instanceId": instanceId, "badgeKey": badgeKey}) return len(records) > 0 def awardBadge(self, data: Dict[str, Any]) -> Dict[str, Any]: from .datamodelCommcoach import CoachingBadge data["awardedAt"] = getUtcTimestamp() data["createdAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingBadge, data) # ========================================================================= # Score History # ========================================================================= def getScoreHistory(self, moduleId: str, userId: str) -> Dict[str, List[Dict[str, Any]]]: scores = self.getScores(moduleId, userId) history: Dict[str, List[Dict[str, Any]]] = {} for s in scores: dim = s.get("dimension", "unknown") if dim not in history: history[dim] = [] history[dim].append({"score": s.get("score"), "trend": s.get("trend"), "evidence": s.get("evidence"), "createdAt": s.get("createdAt"), "sessionId": s.get("sessionId")}) for dim in history: history[dim].sort(key=lambda x: x.get("createdAt") or "") return history # ========================================================================= # User Profile # ========================================================================= def getProfile(self, userId: str, instanceId: str) -> Optional[Dict[str, Any]]: records = self.db.getRecordset( CoachingUserProfile, recordFilter={"userId": userId, "instanceId": instanceId}, ) return records[0] if records else None def getOrCreateProfile(self, userId: str, mandateId: str, instanceId: str) -> Dict[str, Any]: existing = self.getProfile(userId, instanceId) if existing: return existing data = CoachingUserProfile( userId=userId, mandateId=mandateId, instanceId=instanceId, ).model_dump() data["createdAt"] = getIsoTimestamp() data["updatedAt"] = getIsoTimestamp() return self.db.recordCreate(CoachingUserProfile, data) def updateProfile(self, profileId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: updates["updatedAt"] = getIsoTimestamp() return self.db.recordModify(CoachingUserProfile, profileId, updates) # ========================================================================= # Dashboard Aggregation # ========================================================================= def getDashboardData(self, userId: str, instanceId: str) -> Dict[str, Any]: modules = self.db.getRecordset(TrainingModule, recordFilter={"userId": userId, "instanceId": instanceId}) sessions = self.db.getRecordset(CoachingSession, recordFilter={"userId": userId, "instanceId": instanceId}) profile = self.getProfile(userId, instanceId) activeModules = [m for m in modules if m.get("status") == TrainingModuleStatus.ACTIVE.value] totalMinutes = sum(s.get("durationSeconds", 0) for s in sessions) // 60 scores = [] for s in sessions: raw = s.get("competenceScore") if raw is not None: try: scores.append(float(raw)) except (ValueError, TypeError): pass avgScore = sum(scores) / len(scores) if scores else None recentScores = self.getRecentScores(userId, limit=10) countByModule: Dict[str, int] = {} for s in sessions: mid = s.get("moduleId") if mid: countByModule[mid] = countByModule.get(mid, 0) + 1 moduleSummaries = [] for mod in activeModules: modId = mod.get("id", "") moduleSummaries.append({ "id": modId, "title": mod.get("title"), "moduleType": mod.get("moduleType"), "sessionCount": countByModule.get(modId, 0), "lastSessionAt": mod.get("lastSessionAt"), }) return { "totalModules": len(modules), "activeModules": len(activeModules), "totalSessions": len(sessions), "totalMinutes": totalMinutes, "streakDays": profile.get("streakDays", 0) if profile else 0, "longestStreak": profile.get("longestStreak", 0) if profile else 0, "averageScore": round(avgScore, 1) if avgScore else None, "recentScores": recentScores, "openTasks": self.getOpenTaskCount(userId, instanceId), "completedTasks": self.getCompletedTaskCount(userId, instanceId), "modules": moduleSummaries, "badges": self.getBadges(userId, instanceId), "level": _calcLevel(len(sessions)), } _LEVELS = [ (50, 5, "master", "Meister"), (25, 4, "expert", "Experte"), (10, 3, "advanced", "Fortgeschritten"), (3, 2, "engaged", "Engagiert"), ] t("Meister") t("Experte") t("Fortgeschritten") t("Engagiert") t("Einsteiger") def _calcLevel(totalSessions: int) -> Dict[str, Any]: for threshold, number, code, labelKey in _LEVELS: if totalSessions >= threshold: return {"number": number, "code": code, "label": resolveText(labelKey), "totalSessions": totalSessions} return {"number": 1, "code": "beginner", "label": resolveText("Einsteiger"), "totalSessions": totalSessions}