# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Interface to Teamsbot database. Uses the PostgreSQL connector for data access with user/mandate filtering. """ import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.timeUtils import getIsoTimestamp from modules.shared.configuration import APP_CONFIG from .datamodelTeamsbot import ( TeamsbotSession, TeamsbotSessionStatus, TeamsbotTranscript, TeamsbotBotResponse, TeamsbotSystemBot, TeamsbotUserSettings, TeamsbotUserAccount, ) logger = logging.getLogger(__name__) # Singleton factory _interfaces = {} def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None): """Factory: get or create a TeamsbotObjects interface instance.""" key = f"{currentUser.id}_{mandateId}_{featureInstanceId}" if key not in _interfaces: _interfaces[key] = TeamsbotObjects(currentUser, mandateId, featureInstanceId) else: _interfaces[key].currentUser = currentUser _interfaces[key].mandateId = mandateId _interfaces[key].featureInstanceId = featureInstanceId return _interfaces[key] class TeamsbotObjects: """Database interface for Teams Bot feature.""" def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None): self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.userId = str(currentUser.id) if currentUser else "system" dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data") dbDatabase = "poweron_teamsbot" dbUser = APP_CONFIG.get("DB_USER") dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET") dbPort = int(APP_CONFIG.get("DB_PORT", 5432)) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, dbPort=dbPort, userId=self.userId, ) # ========================================================================= # Sessions # ========================================================================= def getSessions(self, instanceId: str, includeEnded: bool = True, userId: str | None = None) -> List[Dict[str, Any]]: """Get sessions for a feature instance, optionally filtered by owner.""" recordFilter = {"instanceId": instanceId} if userId: recordFilter["startedByUserId"] = userId records = self.db.getRecordset( TeamsbotSession, recordFilter=recordFilter, ) if not includeEnded: records = [r for r in records if r.get("status") != TeamsbotSessionStatus.ENDED.value] # Sort by startedAt descending records.sort(key=lambda r: r.get("startedAt") or "", reverse=True) return records def getActiveSessions(self, instanceId: str) -> List[Dict[str, Any]]: """Get only active (non-ended, non-error) sessions.""" records = self.db.getRecordset( TeamsbotSession, recordFilter={"instanceId": instanceId}, ) activeStatuses = { TeamsbotSessionStatus.PENDING.value, TeamsbotSessionStatus.JOINING.value, TeamsbotSessionStatus.ACTIVE.value, } return [r for r in records if r.get("status") in activeStatuses] def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]: """Get a single session by ID.""" records = self.db.getRecordset(TeamsbotSession, recordFilter={"id": sessionId}) return records[0] if records else None def createSession(self, sessionData: Dict[str, Any]) -> Dict[str, Any]: """Create a new session.""" sessionData["creationDate"] = getIsoTimestamp() sessionData["lastModified"] = getIsoTimestamp() return self.db.recordCreate(TeamsbotSession, sessionData) def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update session fields.""" updates["lastModified"] = getIsoTimestamp() return self.db.recordModify(TeamsbotSession, sessionId, updates) def deleteSession(self, sessionId: str) -> bool: """Delete a session and all related transcripts and responses.""" # Delete related records first self._deleteTranscriptsBySession(sessionId) self._deleteResponsesBySession(sessionId) # Delete session return self.db.recordDelete(TeamsbotSession, sessionId) # ========================================================================= # Transcripts # ========================================================================= def getTranscripts(self, sessionId: str, limit: int = None, offset: int = None) -> List[Dict[str, Any]]: """Get transcript segments for a session, ordered by timestamp.""" records = self.db.getRecordset( TeamsbotTranscript, recordFilter={"sessionId": sessionId}, ) records.sort(key=lambda r: r.get("timestamp") or "") if offset: records = records[offset:] if limit: records = records[:limit] return records def getRecentTranscripts(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]: """Get the most recent N transcript segments for context building.""" records = self.db.getRecordset( TeamsbotTranscript, recordFilter={"sessionId": sessionId}, ) records.sort(key=lambda r: r.get("timestamp") or "") return records[-count:] def createTranscript(self, transcriptData: Dict[str, Any]) -> Dict[str, Any]: """Create a new transcript segment.""" transcriptData["creationDate"] = getIsoTimestamp() return self.db.recordCreate(TeamsbotTranscript, transcriptData) def updateTranscript(self, transcriptId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update an existing transcript segment (used for differential writing).""" return self.db.recordModify(TeamsbotTranscript, transcriptId, updates) def _deleteTranscriptsBySession(self, sessionId: str) -> int: """Delete all transcripts for a session.""" records = self.db.getRecordset(TeamsbotTranscript, recordFilter={"sessionId": sessionId}) count = 0 for record in records: self.db.recordDelete(TeamsbotTranscript, record.get("id")) count += 1 return count # ========================================================================= # Bot Responses # ========================================================================= def getBotResponses(self, sessionId: str) -> List[Dict[str, Any]]: """Get all bot responses for a session.""" records = self.db.getRecordset( TeamsbotBotResponse, recordFilter={"sessionId": sessionId}, ) records.sort(key=lambda r: r.get("timestamp") or "") return records def createBotResponse(self, responseData: Dict[str, Any]) -> Dict[str, Any]: """Create a new bot response record.""" responseData["creationDate"] = getIsoTimestamp() return self.db.recordCreate(TeamsbotBotResponse, responseData) def _deleteResponsesBySession(self, sessionId: str) -> int: """Delete all bot responses for a session.""" records = self.db.getRecordset(TeamsbotBotResponse, recordFilter={"sessionId": sessionId}) count = 0 for record in records: self.db.recordDelete(TeamsbotBotResponse, record.get("id")) count += 1 return count # ========================================================================= # System Bots (mandate-scoped, admin-managed) # ========================================================================= def getSystemBots(self, mandateId: str) -> List[Dict[str, Any]]: """Get all system bot accounts for a mandate.""" records = self.db.getRecordset(TeamsbotSystemBot, recordFilter={"mandateId": mandateId}) # Strip encrypted passwords from returned records for r in records: r.pop("encryptedPassword", None) return records def getSystemBot(self, botId: str) -> Optional[Dict[str, Any]]: """Get a system bot by ID (includes encrypted password for internal use).""" records = self.db.getRecordset(TeamsbotSystemBot, recordFilter={"id": botId}) return records[0] if records else None def getActiveSystemBot(self, mandateId: str) -> Optional[Dict[str, Any]]: """Get the first active system bot for a mandate (includes encrypted password).""" records = self.db.getRecordset(TeamsbotSystemBot, recordFilter={"mandateId": mandateId, "isActive": True}) return records[0] if records else None def createSystemBot(self, botData: Dict[str, Any]) -> Dict[str, Any]: """Create a new system bot account.""" botData["creationDate"] = getIsoTimestamp() botData["lastModified"] = getIsoTimestamp() return self.db.recordCreate(TeamsbotSystemBot, botData) def updateSystemBot(self, botId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update a system bot account.""" updates["lastModified"] = getIsoTimestamp() return self.db.recordModify(TeamsbotSystemBot, botId, updates) def deleteSystemBot(self, botId: str) -> bool: """Delete a system bot account.""" return self.db.recordDelete(TeamsbotSystemBot, botId) # ========================================================================= # User Settings (per-user per-instance) # ========================================================================= def getUserSettings(self, userId: str, instanceId: str) -> Optional[Dict[str, Any]]: """Get user-specific settings for a feature instance.""" records = self.db.getRecordset( TeamsbotUserSettings, recordFilter={"userId": userId, "instanceId": instanceId}, ) return records[0] if records else None def createUserSettings(self, settingsData: Dict[str, Any]) -> Dict[str, Any]: """Create user settings.""" settingsData["creationDate"] = getIsoTimestamp() settingsData["lastModified"] = getIsoTimestamp() return self.db.recordCreate(TeamsbotUserSettings, settingsData) def updateUserSettings(self, settingsId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update user settings.""" updates["lastModified"] = getIsoTimestamp() return self.db.recordModify(TeamsbotUserSettings, settingsId, updates) def deleteUserSettings(self, settingsId: str) -> bool: """Delete user settings (reset to defaults).""" return self.db.recordDelete(TeamsbotUserSettings, settingsId) # ========================================================================= # User Account Credentials (per-user per-mandate) # ========================================================================= def getUserAccount(self, userId: str, mandateId: str) -> Optional[Dict[str, Any]]: """Get saved MS credentials for a user in a mandate.""" records = self.db.getRecordset( TeamsbotUserAccount, recordFilter={"userId": userId, "mandateId": mandateId}, ) return records[0] if records else None def createUserAccount(self, data: Dict[str, Any]) -> Dict[str, Any]: """Create saved MS credentials.""" data["creationDate"] = getIsoTimestamp() data["lastModified"] = getIsoTimestamp() return self.db.recordCreate(TeamsbotUserAccount, data) def updateUserAccount(self, accountId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update saved MS credentials.""" updates["lastModified"] = getIsoTimestamp() return self.db.recordModify(TeamsbotUserAccount, accountId, updates) def deleteUserAccount(self, accountId: str) -> bool: """Delete saved MS credentials.""" return self.db.recordDelete(TeamsbotUserAccount, accountId) # ========================================================================= # Stats / Aggregation # ========================================================================= def getSessionStats(self, sessionId: str) -> Dict[str, Any]: """Get aggregated statistics for a session.""" transcripts = self.db.getRecordset(TeamsbotTranscript, recordFilter={"sessionId": sessionId}) responses = self.db.getRecordset(TeamsbotBotResponse, recordFilter={"sessionId": sessionId}) totalCost = sum(r.get("priceCHF", 0) for r in responses) totalProcessingTime = sum(r.get("processingTime", 0) for r in responses) return { "transcriptSegments": len(transcripts), "botResponses": len(responses), "totalCostCHF": round(totalCost, 4), "totalProcessingTime": round(totalProcessingTime, 2), "speakers": list(set(t.get("speaker") for t in transcripts if t.get("speaker"))), }