gateway/modules/features/commcoach/interfaceFeatureCommcoach.py
2026-04-16 23:13:05 +02:00

437 lines
19 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface to CommCoach database.
Uses the PostgreSQL connector for data access with strict user ownership.
"""
import json
import logging
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.dbRegistry import registerDatabase
from modules.shared.timeUtils import getIsoTimestamp
from modules.shared.configuration import APP_CONFIG
from modules.shared.i18nRegistry import resolveText, t
from .datamodelCommcoach import (
CoachingContext, CoachingContextStatus,
CoachingSession, CoachingSessionStatus,
CoachingMessage,
CoachingTask, CoachingTaskStatus,
CoachingScore,
CoachingUserProfile,
)
logger = logging.getLogger(__name__)
commcoachDatabase = "poweron_commcoach"
registerDatabase(commcoachDatabase)
_interfaces = {}
def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None):
"""Factory: get or create a CommcoachObjects interface instance."""
key = f"{currentUser.id}_{mandateId}_{featureInstanceId}"
if key not in _interfaces:
_interfaces[key] = CommcoachObjects(currentUser, mandateId, featureInstanceId)
else:
_interfaces[key].currentUser = currentUser
_interfaces[key].mandateId = mandateId
_interfaces[key].featureInstanceId = featureInstanceId
return _interfaces[key]
class CommcoachObjects:
"""Database interface for CommCoach feature. All reads enforce strict userId ownership."""
def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None):
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = str(currentUser.id) if currentUser else "system"
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = commcoachDatabase
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
# =========================================================================
# Contexts
# =========================================================================
def getContexts(self, instanceId: str, userId: str, includeArchived: bool = False) -> List[Dict[str, Any]]:
"""Get all coaching contexts for a user. Strict ownership."""
records = self.db.getRecordset(
CoachingContext,
recordFilter={"instanceId": instanceId, "userId": userId},
)
if not includeArchived:
records = [r for r in records if r.get("status") != CoachingContextStatus.ARCHIVED.value]
records.sort(key=lambda r: r.get("updatedAt") or r.get("createdAt") or "", reverse=True)
return records
def getContext(self, contextId: str) -> Optional[Dict[str, Any]]:
records = self.db.getRecordset(CoachingContext, recordFilter={"id": contextId})
return records[0] if records else None
def createContext(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["createdAt"] = getIsoTimestamp()
data["updatedAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingContext, data)
def updateContext(self, contextId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
updates["updatedAt"] = getIsoTimestamp()
return self.db.recordModify(CoachingContext, contextId, updates)
def deleteContext(self, contextId: str) -> bool:
self._deleteSessionsByContext(contextId)
self._deleteTasksByContext(contextId)
self._deleteScoresByContext(contextId)
return self.db.recordDelete(CoachingContext, contextId)
# =========================================================================
# Sessions
# =========================================================================
def getSessions(self, contextId: str, userId: str) -> List[Dict[str, Any]]:
records = self.db.getRecordset(
CoachingSession,
recordFilter={"contextId": contextId, "userId": userId},
)
records.sort(key=lambda r: r.get("startedAt") or r.get("createdAt") or "", reverse=True)
return records
def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]:
records = self.db.getRecordset(CoachingSession, recordFilter={"id": sessionId})
return records[0] if records else None
def getActiveSession(self, contextId: str, userId: str) -> Optional[Dict[str, Any]]:
records = self.db.getRecordset(
CoachingSession,
recordFilter={"contextId": contextId, "userId": userId, "status": CoachingSessionStatus.ACTIVE.value},
)
return records[0] if records else None
def createSession(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["createdAt"] = getIsoTimestamp()
data["updatedAt"] = getIsoTimestamp()
data["startedAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingSession, data)
def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
updates["updatedAt"] = getIsoTimestamp()
return self.db.recordModify(CoachingSession, sessionId, updates)
def _deleteSessionsByContext(self, contextId: str) -> int:
records = self.db.getRecordset(CoachingSession, recordFilter={"contextId": contextId})
count = 0
for record in records:
self._deleteMessagesBySession(record.get("id"))
self.db.recordDelete(CoachingSession, record.get("id"))
count += 1
return count
# =========================================================================
# Messages
# =========================================================================
def getMessages(self, sessionId: str) -> List[Dict[str, Any]]:
records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId})
records.sort(key=lambda r: r.get("createdAt") or "")
return records
def getRecentMessages(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]:
records = self.getMessages(sessionId)
return records[-count:]
def createMessage(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["createdAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingMessage, data)
def _deleteMessagesBySession(self, sessionId: str) -> int:
records = self.db.getRecordset(CoachingMessage, recordFilter={"sessionId": sessionId})
count = 0
for record in records:
self.db.recordDelete(CoachingMessage, record.get("id"))
count += 1
return count
# =========================================================================
# Tasks
# =========================================================================
def getTasks(self, contextId: str, userId: str) -> List[Dict[str, Any]]:
records = self.db.getRecordset(
CoachingTask,
recordFilter={"contextId": contextId, "userId": userId},
)
records.sort(key=lambda r: r.get("createdAt") or "", reverse=True)
return records
def getTask(self, taskId: str) -> Optional[Dict[str, Any]]:
records = self.db.getRecordset(CoachingTask, recordFilter={"id": taskId})
return records[0] if records else None
def createTask(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["createdAt"] = getIsoTimestamp()
data["updatedAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingTask, data)
def updateTask(self, taskId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
updates["updatedAt"] = getIsoTimestamp()
return self.db.recordModify(CoachingTask, taskId, updates)
def deleteTask(self, taskId: str) -> bool:
return self.db.recordDelete(CoachingTask, taskId)
def _deleteTasksByContext(self, contextId: str) -> int:
records = self.db.getRecordset(CoachingTask, recordFilter={"contextId": contextId})
count = 0
for record in records:
self.db.recordDelete(CoachingTask, record.get("id"))
count += 1
return count
def getOpenTaskCount(self, userId: str, instanceId: str) -> int:
records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId})
return len([r for r in records if r.get("status") in (CoachingTaskStatus.OPEN.value, CoachingTaskStatus.IN_PROGRESS.value)])
def getCompletedTaskCount(self, userId: str, instanceId: str) -> int:
records = self.db.getRecordset(CoachingTask, recordFilter={"userId": userId})
return len([r for r in records if r.get("status") == CoachingTaskStatus.DONE.value])
# =========================================================================
# Scores
# =========================================================================
def getScores(self, contextId: str, userId: str) -> List[Dict[str, Any]]:
records = self.db.getRecordset(
CoachingScore,
recordFilter={"contextId": contextId, "userId": userId},
)
records.sort(key=lambda r: r.get("createdAt") or "")
return records
def getRecentScores(self, userId: str, limit: int = 20) -> List[Dict[str, Any]]:
records = self.db.getRecordset(CoachingScore, recordFilter={"userId": userId})
records.sort(key=lambda r: r.get("createdAt") or "", reverse=True)
return records[:limit]
def createScore(self, data: Dict[str, Any]) -> Dict[str, Any]:
data["createdAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingScore, data)
def _deleteScoresByContext(self, contextId: str) -> int:
records = self.db.getRecordset(CoachingScore, recordFilter={"contextId": contextId})
count = 0
for record in records:
self.db.recordDelete(CoachingScore, record.get("id"))
count += 1
return count
# =========================================================================
# Personas
# =========================================================================
def getPersonas(self, userId: str, instanceId: str) -> List[Dict[str, Any]]:
from .datamodelCommcoach import CoachingPersona
builtins = self.db.getRecordset(CoachingPersona, recordFilter={"userId": "system"})
custom = self.db.getRecordset(CoachingPersona, recordFilter={"userId": userId, "instanceId": instanceId})
all = builtins + custom
return [p for p in all if p.get("isActive", True)]
def getPersona(self, personaId: str) -> Optional[Dict[str, Any]]:
from .datamodelCommcoach import CoachingPersona
records = self.db.getRecordset(CoachingPersona, recordFilter={"id": personaId})
return records[0] if records else None
def createPersona(self, data: Dict[str, Any]) -> Dict[str, Any]:
from .datamodelCommcoach import CoachingPersona
data["createdAt"] = getIsoTimestamp()
data["updatedAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingPersona, data)
def updatePersona(self, personaId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
from .datamodelCommcoach import CoachingPersona
updates["updatedAt"] = getIsoTimestamp()
return self.db.recordModify(CoachingPersona, personaId, updates)
def deletePersona(self, personaId: str) -> bool:
from .datamodelCommcoach import CoachingPersona
return self.db.recordDelete(CoachingPersona, personaId)
# =========================================================================
# Badges
# =========================================================================
def getBadges(self, userId: str, instanceId: str) -> List[Dict[str, Any]]:
from .datamodelCommcoach import CoachingBadge
records = self.db.getRecordset(CoachingBadge, recordFilter={"userId": userId, "instanceId": instanceId})
records.sort(key=lambda r: r.get("awardedAt") or "", reverse=True)
return records
def hasBadge(self, userId: str, instanceId: str, badgeKey: str) -> bool:
from .datamodelCommcoach import CoachingBadge
records = self.db.getRecordset(CoachingBadge, recordFilter={"userId": userId, "instanceId": instanceId, "badgeKey": badgeKey})
return len(records) > 0
def awardBadge(self, data: Dict[str, Any]) -> Dict[str, Any]:
from .datamodelCommcoach import CoachingBadge
data["awardedAt"] = getIsoTimestamp()
data["createdAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingBadge, data)
# =========================================================================
# Score History
# =========================================================================
def getScoreHistory(self, contextId: str, userId: str) -> Dict[str, List[Dict[str, Any]]]:
scores = self.getScores(contextId, userId)
history: Dict[str, List[Dict[str, Any]]] = {}
for s in scores:
dim = s.get("dimension", "unknown")
if dim not in history:
history[dim] = []
history[dim].append({"score": s.get("score"), "trend": s.get("trend"), "evidence": s.get("evidence"), "createdAt": s.get("createdAt"), "sessionId": s.get("sessionId")})
for dim in history:
history[dim].sort(key=lambda x: x.get("createdAt") or "")
return history
# =========================================================================
# User Profile
# =========================================================================
def getProfile(self, userId: str, instanceId: str) -> Optional[Dict[str, Any]]:
records = self.db.getRecordset(
CoachingUserProfile,
recordFilter={"userId": userId, "instanceId": instanceId},
)
return records[0] if records else None
def getOrCreateProfile(self, userId: str, mandateId: str, instanceId: str) -> Dict[str, Any]:
existing = self.getProfile(userId, instanceId)
if existing:
return existing
data = CoachingUserProfile(
userId=userId,
mandateId=mandateId,
instanceId=instanceId,
).model_dump()
data["createdAt"] = getIsoTimestamp()
data["updatedAt"] = getIsoTimestamp()
return self.db.recordCreate(CoachingUserProfile, data)
def updateProfile(self, profileId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
updates["updatedAt"] = getIsoTimestamp()
return self.db.recordModify(CoachingUserProfile, profileId, updates)
# =========================================================================
# Dashboard Aggregation
# =========================================================================
def getDashboardData(self, userId: str, instanceId: str) -> Dict[str, Any]:
contexts = self.db.getRecordset(CoachingContext, recordFilter={"userId": userId, "instanceId": instanceId})
sessions = self.db.getRecordset(CoachingSession, recordFilter={"userId": userId, "instanceId": instanceId})
profile = self.getProfile(userId, instanceId)
activeContexts = [c for c in contexts if c.get("status") == CoachingContextStatus.ACTIVE.value]
completedSessions = [s for s in sessions if s.get("status") == CoachingSessionStatus.COMPLETED.value]
totalMinutes = sum(s.get("durationSeconds", 0) for s in completedSessions) // 60
scores = []
for s in completedSessions:
raw = s.get("competenceScore")
if raw is not None:
try:
scores.append(float(raw))
except (ValueError, TypeError):
pass
avgScore = sum(scores) / len(scores) if scores else None
recentScores = self.getRecentScores(userId, limit=10)
contextSummaries = []
for ctx in activeContexts:
goalProgress = _calcGoalProgress(ctx.get("goals"))
contextSummaries.append({
"id": ctx.get("id"),
"title": ctx.get("title"),
"category": ctx.get("category"),
"sessionCount": ctx.get("sessionCount", 0),
"lastSessionAt": ctx.get("lastSessionAt"),
"goalProgress": goalProgress,
})
allGoalProgress = []
for ctx in activeContexts:
gp = _calcGoalProgress(ctx.get("goals"))
if gp is not None:
allGoalProgress.append(gp)
overallGoalProgress = round(sum(allGoalProgress) / len(allGoalProgress)) if allGoalProgress else None
return {
"totalContexts": len(contexts),
"activeContexts": len(activeContexts),
"totalSessions": len(completedSessions),
"totalMinutes": totalMinutes,
"streakDays": profile.get("streakDays", 0) if profile else 0,
"longestStreak": profile.get("longestStreak", 0) if profile else 0,
"averageScore": round(avgScore, 1) if avgScore else None,
"recentScores": recentScores,
"openTasks": self.getOpenTaskCount(userId, instanceId),
"completedTasks": self.getCompletedTaskCount(userId, instanceId),
"contexts": contextSummaries,
"goalProgress": overallGoalProgress,
"badges": self.getBadges(userId, instanceId),
"level": _calcLevel(profile.get("totalSessions", 0) if profile else 0),
}
def _calcGoalProgress(goalsRaw) -> Optional[int]:
"""Calculate goal completion percentage from a context's goals JSON field."""
if not goalsRaw:
return None
goals = goalsRaw
if isinstance(goalsRaw, str):
try:
goals = json.loads(goalsRaw)
except (json.JSONDecodeError, TypeError):
return None
if not isinstance(goals, list) or len(goals) == 0:
return None
done = sum(1 for g in goals if isinstance(g, dict) and g.get("status") in ("done", "completed"))
return round(done / len(goals) * 100)
_LEVELS = [
(50, 5, "master", "Meister"),
(25, 4, "expert", "Experte"),
(10, 3, "advanced", "Fortgeschritten"),
(3, 2, "engaged", "Engagiert"),
]
t("Meister")
t("Experte")
t("Fortgeschritten")
t("Engagiert")
t("Einsteiger")
def _calcLevel(totalSessions: int) -> Dict[str, Any]:
for threshold, number, code, labelKey in _LEVELS:
if totalSessions >= threshold:
return {"number": number, "code": code, "label": resolveText(labelKey), "totalSessions": totalSessions}
return {"number": 1, "code": "beginner", "label": resolveText("Einsteiger"), "totalSessions": totalSessions}