# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Interface to Teamsbot database. Uses the PostgreSQL connector for data access with user/mandate filtering. """ import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelUam import User from modules.connectors.connectorDbPostgre import DatabaseConnector from modules.shared.timeUtils import getUtcTimestamp from modules.shared.configuration import APP_CONFIG from .datamodelTeamsbot import ( TeamsbotSession, TeamsbotSessionStatus, TeamsbotTranscript, TeamsbotBotResponse, ) logger = logging.getLogger(__name__) # Singleton factory _interfaces = {} def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None): """Factory: get or create a TeamsbotObjects interface instance.""" key = f"{currentUser.id}_{mandateId}_{featureInstanceId}" if key not in _interfaces: _interfaces[key] = TeamsbotObjects(currentUser, mandateId, featureInstanceId) else: _interfaces[key].currentUser = currentUser _interfaces[key].mandateId = mandateId _interfaces[key].featureInstanceId = featureInstanceId return _interfaces[key] class TeamsbotObjects: """Database interface for Teams Bot feature.""" def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None): self.currentUser = currentUser self.mandateId = mandateId self.featureInstanceId = featureInstanceId self.db = DatabaseConnector() # ========================================================================= # Sessions # ========================================================================= def getSessions(self, instanceId: str, includeEnded: bool = True) -> List[Dict[str, Any]]: """Get all sessions for a feature instance.""" filters = {"instanceId": instanceId} if not includeEnded: filters["status__ne"] = TeamsbotSessionStatus.ENDED.value records = self.db.getRecordset( TeamsbotSession, filters=filters, orderBy=[("startedAt", "DESC")] ) return records def getActiveSessions(self, instanceId: str) -> List[Dict[str, Any]]: """Get only active (non-ended, non-error) sessions.""" records = self.db.getRecordset( TeamsbotSession, filters={ "instanceId": instanceId, "status__in": [ TeamsbotSessionStatus.PENDING.value, TeamsbotSessionStatus.JOINING.value, TeamsbotSessionStatus.ACTIVE.value, ] } ) return records def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]: """Get a single session by ID.""" records = self.db.getRecordset(TeamsbotSession, filters={"id": sessionId}) return records[0] if records else None def createSession(self, sessionData: Dict[str, Any]) -> Dict[str, Any]: """Create a new session.""" sessionData["creationDate"] = getUtcTimestamp() sessionData["lastModified"] = getUtcTimestamp() return self.db.recordCreate(TeamsbotSession, sessionData) def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: """Update session fields.""" updates["lastModified"] = getUtcTimestamp() return self.db.recordModify(TeamsbotSession, sessionId, updates) def deleteSession(self, sessionId: str) -> bool: """Delete a session and all related transcripts and responses.""" # Delete related records first self._deleteTranscriptsBySession(sessionId) self._deleteResponsesBySession(sessionId) # Delete session return self.db.recordDelete(TeamsbotSession, sessionId) # ========================================================================= # Transcripts # ========================================================================= def getTranscripts(self, sessionId: str, limit: int = None, offset: int = None) -> List[Dict[str, Any]]: """Get transcript segments for a session, ordered by timestamp.""" filters = {"sessionId": sessionId} records = self.db.getRecordset( TeamsbotTranscript, filters=filters, orderBy=[("timestamp", "ASC")], limit=limit, offset=offset ) return records def getRecentTranscripts(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]: """Get the most recent N transcript segments for context building.""" records = self.db.getRecordset( TeamsbotTranscript, filters={"sessionId": sessionId}, orderBy=[("timestamp", "DESC")], limit=count ) # Reverse to get chronological order records.reverse() return records def createTranscript(self, transcriptData: Dict[str, Any]) -> Dict[str, Any]: """Create a new transcript segment.""" transcriptData["creationDate"] = getUtcTimestamp() return self.db.recordCreate(TeamsbotTranscript, transcriptData) def _deleteTranscriptsBySession(self, sessionId: str) -> int: """Delete all transcripts for a session.""" records = self.db.getRecordset(TeamsbotTranscript, filters={"sessionId": sessionId}) count = 0 for record in records: self.db.recordDelete(TeamsbotTranscript, record.get("id")) count += 1 return count # ========================================================================= # Bot Responses # ========================================================================= def getBotResponses(self, sessionId: str) -> List[Dict[str, Any]]: """Get all bot responses for a session.""" records = self.db.getRecordset( TeamsbotBotResponse, filters={"sessionId": sessionId}, orderBy=[("timestamp", "ASC")] ) return records def createBotResponse(self, responseData: Dict[str, Any]) -> Dict[str, Any]: """Create a new bot response record.""" responseData["creationDate"] = getUtcTimestamp() return self.db.recordCreate(TeamsbotBotResponse, responseData) def _deleteResponsesBySession(self, sessionId: str) -> int: """Delete all bot responses for a session.""" records = self.db.getRecordset(TeamsbotBotResponse, filters={"sessionId": sessionId}) count = 0 for record in records: self.db.recordDelete(TeamsbotBotResponse, record.get("id")) count += 1 return count # ========================================================================= # Stats / Aggregation # ========================================================================= def getSessionStats(self, sessionId: str) -> Dict[str, Any]: """Get aggregated statistics for a session.""" transcripts = self.db.getRecordset(TeamsbotTranscript, filters={"sessionId": sessionId}) responses = self.db.getRecordset(TeamsbotBotResponse, filters={"sessionId": sessionId}) totalCost = sum(r.get("priceCHF", 0) for r in responses) totalProcessingTime = sum(r.get("processingTime", 0) for r in responses) return { "transcriptSegments": len(transcripts), "botResponses": len(responses), "totalCostCHF": round(totalCost, 4), "totalProcessingTime": round(totalProcessingTime, 2), "speakers": list(set(t.get("speaker") for t in transcripts if t.get("speaker"))), }