From edecfb002c57374098b4085798466f21cb10b6a5 Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Fri, 13 Feb 2026 00:00:35 +0100 Subject: [PATCH] teamsbot --- modules/datamodels/datamodelAi.py | 3 + modules/features/teamsbot/__init__.py | 2 + modules/features/teamsbot/bridgeConnector.py | 121 ++++ modules/features/teamsbot/config.py | 61 ++ .../features/teamsbot/datamodelTeamsbot.py | 189 ++++++ .../teamsbot/interfaceFeatureTeamsbot.py | 193 ++++++ modules/features/teamsbot/mainTeamsbot.py | 282 +++++++++ .../features/teamsbot/routeFeatureTeamsbot.py | 411 +++++++++++++ modules/features/teamsbot/service.py | 549 ++++++++++++++++++ modules/routes/routeSecurityMsft.py | 8 +- modules/services/serviceAi/mainServiceAi.py | 241 ++++++++ 11 files changed, 2058 insertions(+), 2 deletions(-) create mode 100644 modules/features/teamsbot/__init__.py create mode 100644 modules/features/teamsbot/bridgeConnector.py create mode 100644 modules/features/teamsbot/config.py create mode 100644 modules/features/teamsbot/datamodelTeamsbot.py create mode 100644 modules/features/teamsbot/interfaceFeatureTeamsbot.py create mode 100644 modules/features/teamsbot/mainTeamsbot.py create mode 100644 modules/features/teamsbot/routeFeatureTeamsbot.py create mode 100644 modules/features/teamsbot/service.py diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 5b259a02..b94422a7 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -26,6 +26,9 @@ class OperationTypeEnum(str, Enum): WEB_SEARCH_DATA = "webSearch" # Returns list of URLs only WEB_CRAWL = "webCrawl" # Web crawl for a given URL + # Speech Operations (dedicated pipeline, bypasses standard model selection) + SPEECH_TEAMS = "speechTeams" # Teams Meeting AI analysis: decide if/how to respond + # Operation Type Rating - Helper class for capability ratings class OperationTypeRating(BaseModel): diff --git a/modules/features/teamsbot/__init__.py b/modules/features/teamsbot/__init__.py new file mode 100644 index 00000000..fdcc4f0e --- /dev/null +++ b/modules/features/teamsbot/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. diff --git a/modules/features/teamsbot/bridgeConnector.py b/modules/features/teamsbot/bridgeConnector.py new file mode 100644 index 00000000..8e43ee1f --- /dev/null +++ b/modules/features/teamsbot/bridgeConnector.py @@ -0,0 +1,121 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Bridge Connector - Communication with the .NET Media Bridge. +Handles HTTP and WebSocket communication for meeting join/leave and audio streaming. +""" + +import logging +import aiohttp +from typing import Optional, Dict, Any + +logger = logging.getLogger(__name__) + +# Default timeout for bridge HTTP calls +_BRIDGE_TIMEOUT = aiohttp.ClientTimeout(total=30) + + +class BridgeConnector: + """Connector to the .NET Media Bridge service.""" + + def __init__(self, bridgeUrl: Optional[str] = None): + self.bridgeUrl = (bridgeUrl or "").rstrip("/") + + def _isConfigured(self) -> bool: + """Check if the bridge URL is configured.""" + return bool(self.bridgeUrl) + + async def joinMeeting( + self, + meetingLink: str, + botName: str, + backgroundImageUrl: Optional[str], + sessionId: str, + gatewayCallbackUrl: str, + gatewayWsUrl: str, + gatewayBaseUrl: str = "", + ) -> Dict[str, Any]: + """ + Send join command to the .NET Media Bridge. + + Args: + gatewayBaseUrl: Base URL of this gateway instance so the bridge can + build full callback/WebSocket URLs per-session. + + Returns: + Dict with 'success' bool and 'bridgeSessionId' or 'error' string. + """ + if not self._isConfigured(): + logger.warning("Bridge URL not configured. Simulating join for development.") + return { + "success": True, + "bridgeSessionId": f"dev-{sessionId[:8]}", + "message": "Development mode: Bridge not connected" + } + + payload = { + "meetingLink": meetingLink, + "botName": botName, + "backgroundImageUrl": backgroundImageUrl, + "sessionId": sessionId, + "gatewayCallbackUrl": gatewayCallbackUrl, + "gatewayWsUrl": gatewayWsUrl, + "gatewayBaseUrl": gatewayBaseUrl, + } + + try: + async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session: + async with session.post(f"{self.bridgeUrl}/bridge/join", json=payload) as resp: + if resp.status == 200: + data = await resp.json() + return { + "success": True, + "bridgeSessionId": data.get("bridgeSessionId"), + } + else: + errorText = await resp.text() + logger.error(f"Bridge join failed: {resp.status} - {errorText}") + return {"success": False, "error": f"Bridge returned {resp.status}: {errorText}"} + + except aiohttp.ClientError as e: + logger.error(f"Bridge connection error: {e}") + return {"success": False, "error": f"Bridge connection failed: {str(e)}"} + except Exception as e: + logger.error(f"Bridge join error: {e}") + return {"success": False, "error": str(e)} + + async def leaveMeeting(self, sessionId: str) -> Dict[str, Any]: + """Send leave command to the .NET Media Bridge.""" + if not self._isConfigured(): + logger.warning("Bridge URL not configured. Simulating leave for development.") + return {"success": True, "message": "Development mode: Bridge not connected"} + + try: + async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session: + async with session.post(f"{self.bridgeUrl}/bridge/leave/{sessionId}") as resp: + if resp.status == 200: + return {"success": True} + else: + errorText = await resp.text() + logger.error(f"Bridge leave failed: {resp.status} - {errorText}") + return {"success": False, "error": f"Bridge returned {resp.status}: {errorText}"} + + except Exception as e: + logger.error(f"Bridge leave error: {e}") + return {"success": False, "error": str(e)} + + async def getStatus(self) -> Dict[str, Any]: + """Get bridge health and active sessions.""" + if not self._isConfigured(): + return {"healthy": False, "message": "Bridge URL not configured"} + + try: + async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session: + async with session.get(f"{self.bridgeUrl}/bridge/status") as resp: + if resp.status == 200: + return await resp.json() + else: + return {"healthy": False, "error": f"Bridge returned {resp.status}"} + + except Exception as e: + return {"healthy": False, "error": str(e)} diff --git a/modules/features/teamsbot/config.py b/modules/features/teamsbot/config.py new file mode 100644 index 00000000..11a9e3ff --- /dev/null +++ b/modules/features/teamsbot/config.py @@ -0,0 +1,61 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Teamsbot Feature - Configuration utilities. +Loads and manages bot configuration from FeatureInstance.config. +""" + +import logging +from typing import Optional, Dict, Any + +from .datamodelTeamsbot import TeamsbotConfig + +logger = logging.getLogger(__name__) + + +def loadConfigFromInstance(instanceId: str) -> TeamsbotConfig: + """ + Load TeamsbotConfig from a FeatureInstance's config JSONB field. + Falls back to default config if not found or invalid. + """ + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.interfaces.interfaceFeatures import getFeatureInterface + + rootInterface = getRootInterface() + featureInterface = getFeatureInterface(rootInterface.db) + instance = featureInterface.getFeatureInstance(instanceId) + + if not instance: + logger.warning(f"FeatureInstance '{instanceId}' not found, using default config") + return TeamsbotConfig() + + configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None) + + if configDict and isinstance(configDict, dict): + # Extract teamsbot-specific config if nested + teamsbotConfig = configDict.get("teamsbot", configDict) + return TeamsbotConfig(**teamsbotConfig) + + except Exception as e: + logger.error(f"Error loading teamsbot config for instance '{instanceId}': {e}") + + return TeamsbotConfig() + + +def saveConfigToInstance(instanceId: str, config: TeamsbotConfig) -> bool: + """Save TeamsbotConfig to FeatureInstance.config JSONB field.""" + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.interfaces.interfaceFeatures import getFeatureInterface + + rootInterface = getRootInterface() + featureInterface = getFeatureInterface(rootInterface.db) + + featureInterface.updateFeatureInstanceConfig(instanceId, config.model_dump()) + logger.info(f"Teamsbot config saved for instance '{instanceId}'") + return True + + except Exception as e: + logger.error(f"Error saving teamsbot config for instance '{instanceId}': {e}") + return False diff --git a/modules/features/teamsbot/datamodelTeamsbot.py b/modules/features/teamsbot/datamodelTeamsbot.py new file mode 100644 index 00000000..febdce9e --- /dev/null +++ b/modules/features/teamsbot/datamodelTeamsbot.py @@ -0,0 +1,189 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Teamsbot Feature - Data Models. +Pydantic models for Teams Bot sessions, transcripts, bot responses, and configuration. +""" +from typing import Optional, List, Dict, Any +from pydantic import BaseModel, Field +from enum import Enum +import uuid + + +# ============================================================================ +# Enums +# ============================================================================ + +class TeamsbotSessionStatus(str, Enum): + """Status of a Teams Bot session.""" + PENDING = "pending" # Session created, waiting for bridge + JOINING = "joining" # Bridge is joining the meeting + ACTIVE = "active" # Bot is in the meeting and processing audio + LEAVING = "leaving" # Bot is leaving the meeting + ENDED = "ended" # Session completed normally + ERROR = "error" # Session ended with an error + + +class TeamsbotResponseType(str, Enum): + """Type of bot response delivery.""" + AUDIO = "audio" # Voice response only + CHAT = "chat" # Chat message only + BOTH = "both" # Voice + chat message + + +class TeamsbotDetectedIntent(str, Enum): + """Intent detected by the SPEECH_TEAMS AI handler.""" + ADDRESSED = "addressed" # Bot was directly addressed + QUESTION = "question" # A general question was asked + PROACTIVE = "proactive" # Bot has a valuable proactive contribution + NONE = "none" # No action needed + + +class TeamsbotResponseMode(str, Enum): + """How the bot should respond.""" + AUTO = "auto" # Fully automatic: AI decides when to respond + MANUAL = "manual" # User triggers responses manually from UI + TRANSCRIBE_ONLY = "transcribeOnly" # Only transcribe, no AI responses + + +# ============================================================================ +# Database Models (stored in PostgreSQL) +# ============================================================================ + +class TeamsbotSession(BaseModel): + """A Teams Bot meeting session.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Session ID") + instanceId: str = Field(description="Feature instance ID (FK)") + mandateId: str = Field(description="Mandate ID (FK)") + meetingLink: str = Field(description="Teams meeting join link") + botName: str = Field(default="AI Assistant", description="Display name of the bot in the meeting") + backgroundImageUrl: Optional[str] = Field(default=None, description="Background image URL for the bot's video feed") + status: TeamsbotSessionStatus = Field(default=TeamsbotSessionStatus.PENDING, description="Current session status") + startedAt: Optional[str] = Field(default=None, description="ISO timestamp when session started") + endedAt: Optional[str] = Field(default=None, description="ISO timestamp when session ended") + startedByUserId: str = Field(description="User ID who started the session") + bridgeSessionId: Optional[str] = Field(default=None, description="Session ID on the .NET Media Bridge") + meetingChatId: Optional[str] = Field(default=None, description="Teams meeting chat ID for Graph API messages") + summary: Optional[str] = Field(default=None, description="AI-generated meeting summary (after session ends)") + errorMessage: Optional[str] = Field(default=None, description="Error message if status is ERROR") + transcriptSegmentCount: int = Field(default=0, description="Number of transcript segments in this session") + botResponseCount: int = Field(default=0, description="Number of bot responses in this session") + creationDate: Optional[str] = Field(default=None, description="ISO timestamp of record creation") + lastModified: Optional[str] = Field(default=None, description="ISO timestamp of last modification") + + +class TeamsbotTranscript(BaseModel): + """A single transcript segment from the meeting.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Transcript segment ID") + sessionId: str = Field(description="Session ID (FK)") + speaker: Optional[str] = Field(default=None, description="Speaker name or identifier") + text: str = Field(description="Transcribed text") + timestamp: str = Field(description="ISO timestamp of the speech segment") + confidence: float = Field(default=0.0, ge=0.0, le=1.0, description="STT confidence score") + language: Optional[str] = Field(default=None, description="Detected language code (e.g., de-DE)") + isFinal: bool = Field(default=True, description="Whether this is a final or interim result") + creationDate: Optional[str] = Field(default=None, description="ISO timestamp of record creation") + + +class TeamsbotBotResponse(BaseModel): + """A bot response generated during a meeting session.""" + id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Response ID") + sessionId: str = Field(description="Session ID (FK)") + responseText: str = Field(description="The bot's response text") + responseType: TeamsbotResponseType = Field(default=TeamsbotResponseType.BOTH, description="How the response was delivered") + detectedIntent: TeamsbotDetectedIntent = Field(default=TeamsbotDetectedIntent.NONE, description="What triggered the response") + reasoning: Optional[str] = Field(default=None, description="AI reasoning for why it responded") + triggeredByTranscriptId: Optional[str] = Field(default=None, description="Transcript segment that triggered this response") + modelName: Optional[str] = Field(default=None, description="AI model used for this response") + processingTime: float = Field(default=0.0, description="Processing time in seconds") + priceCHF: float = Field(default=0.0, description="Cost of this AI call in CHF") + timestamp: Optional[str] = Field(default=None, description="ISO timestamp of the response") + creationDate: Optional[str] = Field(default=None, description="ISO timestamp of record creation") + + +# ============================================================================ +# Configuration Model (stored in FeatureInstance.config JSONB) +# ============================================================================ + +class TeamsbotConfig(BaseModel): + """Configuration for a Teams Bot feature instance.""" + botName: str = Field(default="AI Assistant", description="Default bot display name") + backgroundImageUrl: Optional[str] = Field(default=None, description="Default background image URL") + aiSystemPrompt: str = Field( + default="Du bist ein hilfreicher Meeting-Assistent. Fasse wichtige Punkte zusammen und beantworte Fragen sachlich.", + description="Custom system prompt for the AI analysis" + ) + responseMode: TeamsbotResponseMode = Field(default=TeamsbotResponseMode.AUTO, description="How the bot responds") + language: str = Field(default="de-DE", description="Primary language for STT/TTS") + voiceId: Optional[str] = Field(default=None, description="Google TTS voice ID (e.g., de-DE-Standard-A)") + bridgeUrl: Optional[str] = Field(default=None, description="URL of the .NET Media Bridge service") + triggerIntervalSeconds: int = Field(default=10, ge=3, le=60, description="Seconds between periodic AI analysis triggers") + triggerCooldownSeconds: int = Field(default=3, ge=1, le=30, description="Minimum seconds between AI calls") + contextWindowSegments: int = Field(default=20, ge=5, le=100, description="Number of transcript segments to include in AI context") + + +# ============================================================================ +# API Request/Response Models +# ============================================================================ + +class TeamsbotStartSessionRequest(BaseModel): + """Request to start a new Teams Bot session.""" + meetingLink: str = Field(description="Teams meeting join link (e.g., https://teams.microsoft.com/l/meetup-join/...)") + botName: Optional[str] = Field(default=None, description="Override bot name for this session") + backgroundImageUrl: Optional[str] = Field(default=None, description="Override background image for this session") + connectionId: Optional[str] = Field(default=None, description="Microsoft connection ID for Graph API access") + + +class TeamsbotSessionResponse(BaseModel): + """Response for session details.""" + session: TeamsbotSession + transcripts: Optional[List[TeamsbotTranscript]] = Field(default=None, description="Transcript segments (if requested)") + botResponses: Optional[List[TeamsbotBotResponse]] = Field(default=None, description="Bot responses (if requested)") + + +class TeamsbotConfigUpdateRequest(BaseModel): + """Request to update teamsbot configuration.""" + botName: Optional[str] = None + backgroundImageUrl: Optional[str] = None + aiSystemPrompt: Optional[str] = None + responseMode: Optional[TeamsbotResponseMode] = None + language: Optional[str] = None + voiceId: Optional[str] = None + bridgeUrl: Optional[str] = None + triggerIntervalSeconds: Optional[int] = None + triggerCooldownSeconds: Optional[int] = None + contextWindowSegments: Optional[int] = None + + +# ============================================================================ +# SPEECH_TEAMS AI Response Model +# ============================================================================ + +class SpeechTeamsResponse(BaseModel): + """Structured response from the SPEECH_TEAMS AI handler.""" + shouldRespond: bool = Field(description="Whether the bot should respond") + responseText: Optional[str] = Field(default=None, description="The bot's response text (only if shouldRespond=True)") + reasoning: str = Field(default="", description="Reasoning for the decision (for logging/debug)") + detectedIntent: str = Field(default="none", description="Detected intent: addressed, question, proactive, none") + + +# ============================================================================ +# Bridge Communication Models +# ============================================================================ + +class BridgeJoinRequest(BaseModel): + """Request sent to .NET Media Bridge to join a meeting.""" + meetingLink: str = Field(description="Teams meeting join link") + botName: str = Field(description="Bot display name") + backgroundImageUrl: Optional[str] = Field(default=None, description="Background image URL") + gatewayCallbackUrl: str = Field(description="Gateway URL for bridge callbacks") + gatewayWsUrl: str = Field(description="Gateway WebSocket URL for audio streaming") + sessionId: str = Field(description="Session ID for correlation") + gatewayBaseUrl: str = Field(description="Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net)") + + +class BridgeStatusResponse(BaseModel): + """Status response from the .NET Media Bridge.""" + healthy: bool = Field(description="Whether the bridge is healthy") + activeSessions: int = Field(default=0, description="Number of active meeting sessions") + sessions: Optional[List[Dict[str, Any]]] = Field(default=None, description="Details of active sessions") diff --git a/modules/features/teamsbot/interfaceFeatureTeamsbot.py b/modules/features/teamsbot/interfaceFeatureTeamsbot.py new file mode 100644 index 00000000..da5a8a62 --- /dev/null +++ b/modules/features/teamsbot/interfaceFeatureTeamsbot.py @@ -0,0 +1,193 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Interface to Teamsbot database. +Uses the PostgreSQL connector for data access with user/mandate filtering. +""" + +import logging +from typing import Dict, Any, List, Optional + +from modules.datamodels.datamodelUam import User +from modules.connectors.connectorDbPostgre import DatabaseConnector +from modules.shared.timeUtils import getUtcTimestamp +from modules.shared.configuration import APP_CONFIG + +from .datamodelTeamsbot import ( + TeamsbotSession, + TeamsbotSessionStatus, + TeamsbotTranscript, + TeamsbotBotResponse, +) + +logger = logging.getLogger(__name__) + +# Singleton factory +_interfaces = {} + + +def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None): + """Factory: get or create a TeamsbotObjects interface instance.""" + key = f"{currentUser.id}_{mandateId}_{featureInstanceId}" + if key not in _interfaces: + _interfaces[key] = TeamsbotObjects(currentUser, mandateId, featureInstanceId) + else: + _interfaces[key].currentUser = currentUser + _interfaces[key].mandateId = mandateId + _interfaces[key].featureInstanceId = featureInstanceId + return _interfaces[key] + + +class TeamsbotObjects: + """Database interface for Teams Bot feature.""" + + def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None): + self.currentUser = currentUser + self.mandateId = mandateId + self.featureInstanceId = featureInstanceId + self.db = DatabaseConnector() + + # ========================================================================= + # Sessions + # ========================================================================= + + def getSessions(self, instanceId: str, includeEnded: bool = True) -> List[Dict[str, Any]]: + """Get all sessions for a feature instance.""" + filters = {"instanceId": instanceId} + if not includeEnded: + filters["status__ne"] = TeamsbotSessionStatus.ENDED.value + + records = self.db.getRecordset( + TeamsbotSession, + filters=filters, + orderBy=[("startedAt", "DESC")] + ) + return records + + def getActiveSessions(self, instanceId: str) -> List[Dict[str, Any]]: + """Get only active (non-ended, non-error) sessions.""" + records = self.db.getRecordset( + TeamsbotSession, + filters={ + "instanceId": instanceId, + "status__in": [ + TeamsbotSessionStatus.PENDING.value, + TeamsbotSessionStatus.JOINING.value, + TeamsbotSessionStatus.ACTIVE.value, + ] + } + ) + return records + + def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]: + """Get a single session by ID.""" + records = self.db.getRecordset(TeamsbotSession, filters={"id": sessionId}) + return records[0] if records else None + + def createSession(self, sessionData: Dict[str, Any]) -> Dict[str, Any]: + """Create a new session.""" + sessionData["creationDate"] = getUtcTimestamp() + sessionData["lastModified"] = getUtcTimestamp() + return self.db.recordCreate(TeamsbotSession, sessionData) + + def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """Update session fields.""" + updates["lastModified"] = getUtcTimestamp() + return self.db.recordModify(TeamsbotSession, sessionId, updates) + + def deleteSession(self, sessionId: str) -> bool: + """Delete a session and all related transcripts and responses.""" + # Delete related records first + self._deleteTranscriptsBySession(sessionId) + self._deleteResponsesBySession(sessionId) + # Delete session + return self.db.recordDelete(TeamsbotSession, sessionId) + + # ========================================================================= + # Transcripts + # ========================================================================= + + def getTranscripts(self, sessionId: str, limit: int = None, offset: int = None) -> List[Dict[str, Any]]: + """Get transcript segments for a session, ordered by timestamp.""" + filters = {"sessionId": sessionId} + records = self.db.getRecordset( + TeamsbotTranscript, + filters=filters, + orderBy=[("timestamp", "ASC")], + limit=limit, + offset=offset + ) + return records + + def getRecentTranscripts(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]: + """Get the most recent N transcript segments for context building.""" + records = self.db.getRecordset( + TeamsbotTranscript, + filters={"sessionId": sessionId}, + orderBy=[("timestamp", "DESC")], + limit=count + ) + # Reverse to get chronological order + records.reverse() + return records + + def createTranscript(self, transcriptData: Dict[str, Any]) -> Dict[str, Any]: + """Create a new transcript segment.""" + transcriptData["creationDate"] = getUtcTimestamp() + return self.db.recordCreate(TeamsbotTranscript, transcriptData) + + def _deleteTranscriptsBySession(self, sessionId: str) -> int: + """Delete all transcripts for a session.""" + records = self.db.getRecordset(TeamsbotTranscript, filters={"sessionId": sessionId}) + count = 0 + for record in records: + self.db.recordDelete(TeamsbotTranscript, record.get("id")) + count += 1 + return count + + # ========================================================================= + # Bot Responses + # ========================================================================= + + def getBotResponses(self, sessionId: str) -> List[Dict[str, Any]]: + """Get all bot responses for a session.""" + records = self.db.getRecordset( + TeamsbotBotResponse, + filters={"sessionId": sessionId}, + orderBy=[("timestamp", "ASC")] + ) + return records + + def createBotResponse(self, responseData: Dict[str, Any]) -> Dict[str, Any]: + """Create a new bot response record.""" + responseData["creationDate"] = getUtcTimestamp() + return self.db.recordCreate(TeamsbotBotResponse, responseData) + + def _deleteResponsesBySession(self, sessionId: str) -> int: + """Delete all bot responses for a session.""" + records = self.db.getRecordset(TeamsbotBotResponse, filters={"sessionId": sessionId}) + count = 0 + for record in records: + self.db.recordDelete(TeamsbotBotResponse, record.get("id")) + count += 1 + return count + + # ========================================================================= + # Stats / Aggregation + # ========================================================================= + + def getSessionStats(self, sessionId: str) -> Dict[str, Any]: + """Get aggregated statistics for a session.""" + transcripts = self.db.getRecordset(TeamsbotTranscript, filters={"sessionId": sessionId}) + responses = self.db.getRecordset(TeamsbotBotResponse, filters={"sessionId": sessionId}) + + totalCost = sum(r.get("priceCHF", 0) for r in responses) + totalProcessingTime = sum(r.get("processingTime", 0) for r in responses) + + return { + "transcriptSegments": len(transcripts), + "botResponses": len(responses), + "totalCostCHF": round(totalCost, 4), + "totalProcessingTime": round(totalProcessingTime, 2), + "speakers": list(set(t.get("speaker") for t in transcripts if t.get("speaker"))), + } diff --git a/modules/features/teamsbot/mainTeamsbot.py b/modules/features/teamsbot/mainTeamsbot.py new file mode 100644 index 00000000..6dd161e0 --- /dev/null +++ b/modules/features/teamsbot/mainTeamsbot.py @@ -0,0 +1,282 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Teamsbot Feature Container - Main Module. +Handles feature initialization and RBAC catalog registration. +""" + +import logging +from typing import Dict, List, Any + +logger = logging.getLogger(__name__) + +# Feature metadata +FEATURE_CODE = "teamsbot" +FEATURE_LABEL = {"en": "Teams Bot", "de": "Teams Bot", "fr": "Teams Bot"} +FEATURE_ICON = "mdi-headset" + +# UI Objects for RBAC catalog +UI_OBJECTS = [ + { + "objectKey": "ui.feature.teamsbot.dashboard", + "label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"}, + "meta": {"area": "dashboard"} + }, + { + "objectKey": "ui.feature.teamsbot.sessions", + "label": {"en": "Sessions", "de": "Sitzungen", "fr": "Sessions"}, + "meta": {"area": "sessions"} + }, + { + "objectKey": "ui.feature.teamsbot.settings", + "label": {"en": "Settings", "de": "Einstellungen", "fr": "Paramètres"}, + "meta": {"area": "settings", "admin_only": True} + }, +] + +# DATA Objects for RBAC catalog (tables/entities) +DATA_OBJECTS = [ + { + "objectKey": "data.feature.teamsbot.TeamsbotSession", + "label": {"en": "Session", "de": "Sitzung", "fr": "Session"}, + "meta": {"table": "TeamsbotSession", "fields": ["id", "meetingLink", "botName", "status", "startedAt", "endedAt"]} + }, + { + "objectKey": "data.feature.teamsbot.TeamsbotTranscript", + "label": {"en": "Transcript", "de": "Transkript", "fr": "Transcription"}, + "meta": {"table": "TeamsbotTranscript", "fields": ["id", "sessionId", "speaker", "text", "timestamp"]} + }, + { + "objectKey": "data.feature.teamsbot.TeamsbotBotResponse", + "label": {"en": "Bot Response", "de": "Bot-Antwort", "fr": "Réponse du bot"}, + "meta": {"table": "TeamsbotBotResponse", "fields": ["id", "sessionId", "responseText", "detectedIntent"]} + }, + { + "objectKey": "data.feature.teamsbot.*", + "label": {"en": "All Teams Bot Data", "de": "Alle Teams Bot Daten", "fr": "Toutes les données Teams Bot"}, + "meta": {"wildcard": True, "description": "Wildcard for all teamsbot data tables"} + }, +] + +# Resource Objects for RBAC catalog +RESOURCE_OBJECTS = [ + { + "objectKey": "resource.feature.teamsbot.session.start", + "label": {"en": "Start Session", "de": "Sitzung starten", "fr": "Démarrer session"}, + "meta": {"endpoint": "/api/teamsbot/{instanceId}/sessions", "method": "POST"} + }, + { + "objectKey": "resource.feature.teamsbot.session.stop", + "label": {"en": "Stop Session", "de": "Sitzung beenden", "fr": "Arrêter session"}, + "meta": {"endpoint": "/api/teamsbot/{instanceId}/sessions/{sessionId}/stop", "method": "POST"} + }, + { + "objectKey": "resource.feature.teamsbot.session.delete", + "label": {"en": "Delete Session", "de": "Sitzung löschen", "fr": "Supprimer session"}, + "meta": {"endpoint": "/api/teamsbot/{instanceId}/sessions/{sessionId}", "method": "DELETE"} + }, + { + "objectKey": "resource.feature.teamsbot.config.edit", + "label": {"en": "Edit Configuration", "de": "Konfiguration bearbeiten", "fr": "Modifier configuration"}, + "meta": {"endpoint": "/api/teamsbot/{instanceId}/config", "method": "PUT", "admin_only": True} + }, +] + +# Template roles for this feature with AccessRules +TEMPLATE_ROLES = [ + { + "roleLabel": "teamsbot-admin", + "description": { + "en": "Teams Bot Administrator - Full access to all sessions and settings", + "de": "Teams Bot Administrator - Vollzugriff auf alle Sitzungen und Einstellungen", + "fr": "Administrateur Teams Bot - Accès complet aux sessions et paramètres" + }, + "accessRules": [ + # Full UI access (all views including settings) + {"context": "UI", "item": None, "view": True}, + # Full DATA access + {"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"}, + # All resources + {"context": "RESOURCE", "item": "resource.feature.teamsbot.session.start", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.teamsbot.session.stop", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.teamsbot.session.delete", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.teamsbot.config.edit", "view": True}, + ] + }, + { + "roleLabel": "teamsbot-user", + "description": { + "en": "Teams Bot User - Can start/stop sessions and view transcripts", + "de": "Teams Bot Benutzer - Kann Sitzungen starten/stoppen und Transkripte einsehen", + "fr": "Utilisateur Teams Bot - Peut démarrer/arrêter des sessions et voir les transcriptions" + }, + "accessRules": [ + # UI access to dashboard and sessions (not settings) + {"context": "UI", "item": "ui.feature.teamsbot.dashboard", "view": True}, + {"context": "UI", "item": "ui.feature.teamsbot.sessions", "view": True}, + # Own records only + {"context": "DATA", "item": "data.feature.teamsbot.TeamsbotSession", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"}, + {"context": "DATA", "item": "data.feature.teamsbot.TeamsbotTranscript", "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, + {"context": "DATA", "item": "data.feature.teamsbot.TeamsbotBotResponse", "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"}, + # Start and stop sessions + {"context": "RESOURCE", "item": "resource.feature.teamsbot.session.start", "view": True}, + {"context": "RESOURCE", "item": "resource.feature.teamsbot.session.stop", "view": True}, + ] + }, +] + + +def getFeatureDefinition() -> Dict[str, Any]: + """Return the feature definition for registration.""" + return { + "code": FEATURE_CODE, + "label": FEATURE_LABEL, + "icon": FEATURE_ICON + } + + +def getUiObjects() -> List[Dict[str, Any]]: + """Return UI objects for RBAC catalog registration.""" + return UI_OBJECTS + + +def getResourceObjects() -> List[Dict[str, Any]]: + """Return resource objects for RBAC catalog registration.""" + return RESOURCE_OBJECTS + + +def getTemplateRoles() -> List[Dict[str, Any]]: + """Return template roles for this feature.""" + return TEMPLATE_ROLES + + +def getDataObjects() -> List[Dict[str, Any]]: + """Return DATA objects for RBAC catalog registration.""" + return DATA_OBJECTS + + +def registerFeature(catalogService) -> bool: + """Register this feature's RBAC objects in the catalog.""" + try: + for uiObj in UI_OBJECTS: + catalogService.registerUiObject( + featureCode=FEATURE_CODE, + objectKey=uiObj["objectKey"], + label=uiObj["label"], + meta=uiObj.get("meta") + ) + + for resObj in RESOURCE_OBJECTS: + catalogService.registerResourceObject( + featureCode=FEATURE_CODE, + objectKey=resObj["objectKey"], + label=resObj["label"], + meta=resObj.get("meta") + ) + + for dataObj in DATA_OBJECTS: + catalogService.registerDataObject( + featureCode=FEATURE_CODE, + objectKey=dataObj["objectKey"], + label=dataObj["label"], + meta=dataObj.get("meta") + ) + + _syncTemplateRolesToDb() + + logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI, {len(RESOURCE_OBJECTS)} resource, {len(DATA_OBJECTS)} data objects") + return True + + except Exception as e: + logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}") + return False + + +def _syncTemplateRolesToDb() -> int: + """Sync template roles and their AccessRules to the database.""" + try: + from modules.interfaces.interfaceDbApp import getRootInterface + from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext + + rootInterface = getRootInterface() + existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE) + templateRoles = [r for r in existingRoles if r.mandateId is None] + existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles} + + createdCount = 0 + for roleTemplate in TEMPLATE_ROLES: + roleLabel = roleTemplate["roleLabel"] + + if roleLabel in existingRoleLabels: + roleId = existingRoleLabels[roleLabel] + _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) + else: + newRole = Role( + roleLabel=roleLabel, + description=roleTemplate.get("description", {}), + featureCode=FEATURE_CODE, + mandateId=None, + featureInstanceId=None, + isSystemRole=False + ) + createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump()) + roleId = createdRole.get("id") + _ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", [])) + logger.info(f"Created template role '{roleLabel}' with ID {roleId}") + createdCount += 1 + + if createdCount > 0: + logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles") + + return createdCount + + except Exception as e: + logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}") + return 0 + + +def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int: + """Ensure AccessRules exist for a role based on templates.""" + from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext + + existingRules = rootInterface.getAccessRulesByRole(roleId) + existingSignatures = set() + for rule in existingRules: + sig = (rule.context.value if rule.context else None, rule.item) + existingSignatures.add(sig) + + createdCount = 0 + for template in ruleTemplates: + context = template.get("context", "UI") + item = template.get("item") + sig = (context, item) + + if sig in existingSignatures: + continue + + if context == "UI": + contextEnum = AccessRuleContext.UI + elif context == "DATA": + contextEnum = AccessRuleContext.DATA + elif context == "RESOURCE": + contextEnum = AccessRuleContext.RESOURCE + else: + contextEnum = context + + newRule = AccessRule( + roleId=roleId, + context=contextEnum, + item=item, + view=template.get("view", False), + read=template.get("read"), + create=template.get("create"), + update=template.get("update"), + delete=template.get("delete"), + ) + rootInterface.db.recordCreate(AccessRule, newRule.model_dump()) + createdCount += 1 + + if createdCount > 0: + logger.debug(f"Created {createdCount} AccessRules for role {roleId}") + + return createdCount diff --git a/modules/features/teamsbot/routeFeatureTeamsbot.py b/modules/features/teamsbot/routeFeatureTeamsbot.py new file mode 100644 index 00000000..e4d390d2 --- /dev/null +++ b/modules/features/teamsbot/routeFeatureTeamsbot.py @@ -0,0 +1,411 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Teamsbot routes for the backend API. +Implements Teams Bot session management, live streaming, and configuration endpoints. +""" + +import logging +import json +import asyncio +from typing import Optional +from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, Request +from fastapi.responses import StreamingResponse + +# Import auth modules +from modules.auth import limiter, getRequestContext, RequestContext + +# Import interfaces +from . import interfaceFeatureTeamsbot as interfaceDb +from modules.interfaces.interfaceDbApp import getRootInterface +from modules.interfaces.interfaceFeatures import getFeatureInterface + +# Import models +from .datamodelTeamsbot import ( + TeamsbotSession, + TeamsbotSessionStatus, + TeamsbotStartSessionRequest, + TeamsbotSessionResponse, + TeamsbotConfigUpdateRequest, + TeamsbotConfig, +) + +# Import service +from .service import TeamsbotService + +logger = logging.getLogger(__name__) + +# Create router +router = APIRouter( + prefix="/api/teamsbot", + tags=["Teamsbot"], + responses={404: {"description": "Not found"}} +) + + +# ========================================================================= +# Helpers +# ========================================================================= + +def _getInterface(context: RequestContext, instanceId: Optional[str] = None): + """Get teamsbot interface with instance context.""" + mandateId = str(context.mandateId) if context.mandateId else None + return interfaceDb.getInterface( + context.user, + mandateId=mandateId, + featureInstanceId=instanceId + ) + + +def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: + """Validate user access to the feature instance. Returns mandateId.""" + rootInterface = getRootInterface() + featureInterface = getFeatureInterface(rootInterface.db) + + instance = featureInterface.getFeatureInstance(instanceId) + if not instance: + raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found") + + mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None) + if not mandateId: + raise HTTPException(status_code=500, detail="Feature instance has no mandateId") + + return str(mandateId) + + +def _getInstanceConfig(instanceId: str) -> TeamsbotConfig: + """Load TeamsbotConfig from FeatureInstance.config JSONB field.""" + rootInterface = getRootInterface() + featureInterface = getFeatureInterface(rootInterface.db) + instance = featureInterface.getFeatureInstance(instanceId) + + if not instance: + return TeamsbotConfig() + + configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None) + if configDict and isinstance(configDict, dict): + try: + return TeamsbotConfig(**configDict) + except Exception: + pass + return TeamsbotConfig() + + +# ========================================================================= +# Session Endpoints +# ========================================================================= + +@router.post("/{instanceId}/sessions") +@limiter.limit("10/minute") +async def startSession( + instanceId: str, + request: TeamsbotStartSessionRequest, + httpRequest: Request, + context: RequestContext = Depends(getRequestContext), +): + """Start a new Teams Bot session -- bot joins the specified meeting.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + config = _getInstanceConfig(instanceId) + + # Create session + sessionData = TeamsbotSession( + instanceId=instanceId, + mandateId=mandateId, + meetingLink=request.meetingLink, + botName=request.botName or config.botName, + backgroundImageUrl=request.backgroundImageUrl or config.backgroundImageUrl, + status=TeamsbotSessionStatus.PENDING, + startedByUserId=str(context.user.id), + ).model_dump() + + createdSession = interface.createSession(sessionData) + sessionId = createdSession.get("id") + + # Derive gateway base URL from the incoming request so the bridge + # can build full callback/WS URLs targeting this specific gateway instance. + gatewayBaseUrl = str(httpRequest.base_url).rstrip("/") + + # Start the bot in background (join meeting via bridge) + service = TeamsbotService(context.user, mandateId, instanceId, config) + asyncio.create_task( + service.joinMeeting(sessionId, request.meetingLink, request.connectionId, gatewayBaseUrl) + ) + + logger.info(f"Teamsbot session {sessionId} created for instance {instanceId}") + return {"session": createdSession} + + +@router.get("/{instanceId}/sessions") +@limiter.limit("30/minute") +async def listSessions( + instanceId: str, + includeEnded: bool = Query(True, description="Include ended sessions"), + context: RequestContext = Depends(getRequestContext) +): + """List all sessions for a feature instance.""" + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + sessions = interface.getSessions(instanceId, includeEnded=includeEnded) + return {"sessions": sessions} + + +@router.get("/{instanceId}/sessions/{sessionId}") +@limiter.limit("30/minute") +async def getSession( + instanceId: str, + sessionId: str, + includeTranscripts: bool = Query(True), + includeResponses: bool = Query(True), + context: RequestContext = Depends(getRequestContext) +): + """Get session details with optional transcripts and bot responses.""" + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") + + result = {"session": session} + + if includeTranscripts: + result["transcripts"] = interface.getTranscripts(sessionId) + + if includeResponses: + result["botResponses"] = interface.getBotResponses(sessionId) + result["stats"] = interface.getSessionStats(sessionId) + + return result + + +@router.get("/{instanceId}/sessions/{sessionId}/stream") +@limiter.limit("10/minute") +async def streamSession( + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext) +): + """ + SSE live stream for a session. + Streams transcript segments and bot responses in real-time. + Events: transcript, botResponse, statusChange, error + """ + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") + + async def _eventGenerator(): + """Generate SSE events from the session event queue.""" + from .service import _sessionEvents + + # Send initial session state + yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n" + + # Stream events + eventQueue = _sessionEvents.get(sessionId) + if not eventQueue: + _sessionEvents[sessionId] = asyncio.Queue() + eventQueue = _sessionEvents[sessionId] + + try: + while True: + try: + event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) + yield f"data: {json.dumps(event)}\n\n" + + # Stop streaming if session ended + if event.get("type") == "statusChange" and event.get("data", {}).get("status") in ["ended", "error"]: + break + except asyncio.TimeoutError: + # Send keepalive ping + yield f"data: {json.dumps({'type': 'ping'})}\n\n" + except asyncio.CancelledError: + pass + + return StreamingResponse( + _eventGenerator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + } + ) + + +@router.post("/{instanceId}/sessions/{sessionId}/stop") +@limiter.limit("10/minute") +async def stopSession( + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext) +): + """Stop an active session -- bot leaves the meeting.""" + mandateId = _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + config = _getInstanceConfig(instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") + + currentStatus = session.get("status") + if currentStatus in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]: + raise HTTPException(status_code=400, detail=f"Session already in terminal state: {currentStatus}") + + # Stop the bot + service = TeamsbotService(context.user, mandateId, instanceId, config) + await service.leaveMeeting(sessionId) + + logger.info(f"Teamsbot session {sessionId} stop requested") + return {"status": "stopping", "sessionId": sessionId} + + +@router.delete("/{instanceId}/sessions/{sessionId}") +@limiter.limit("10/minute") +async def deleteSession( + instanceId: str, + sessionId: str, + context: RequestContext = Depends(getRequestContext) +): + """Delete a session and all related data.""" + _validateInstanceAccess(instanceId, context) + interface = _getInterface(context, instanceId) + + session = interface.getSession(sessionId) + if not session: + raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") + + # Don't delete active sessions + currentStatus = session.get("status") + if currentStatus in [TeamsbotSessionStatus.ACTIVE.value, TeamsbotSessionStatus.JOINING.value]: + raise HTTPException(status_code=400, detail="Cannot delete an active session. Stop it first.") + + interface.deleteSession(sessionId) + logger.info(f"Teamsbot session {sessionId} deleted") + return {"deleted": True, "sessionId": sessionId} + + +# ========================================================================= +# Configuration Endpoints +# ========================================================================= + +@router.get("/{instanceId}/config") +@limiter.limit("30/minute") +async def getConfig( + instanceId: str, + context: RequestContext = Depends(getRequestContext) +): + """Get the teamsbot configuration for a feature instance.""" + _validateInstanceAccess(instanceId, context) + config = _getInstanceConfig(instanceId) + return {"config": config.model_dump()} + + +@router.put("/{instanceId}/config") +@limiter.limit("10/minute") +async def updateConfig( + instanceId: str, + request: TeamsbotConfigUpdateRequest, + context: RequestContext = Depends(getRequestContext) +): + """Update the teamsbot configuration.""" + mandateId = _validateInstanceAccess(instanceId, context) + + # Load current config and merge updates + currentConfig = _getInstanceConfig(instanceId) + updateDict = request.model_dump(exclude_none=True) + mergedConfig = currentConfig.model_copy(update=updateDict) + + # Save to FeatureInstance.config + rootInterface = getRootInterface() + featureInterface = getFeatureInterface(rootInterface.db) + featureInterface.updateFeatureInstanceConfig(instanceId, mergedConfig.model_dump()) + + logger.info(f"Teamsbot config updated for instance {instanceId}: {list(updateDict.keys())}") + return {"config": mergedConfig.model_dump()} + + +# ========================================================================= +# Bridge Communication Endpoints (called by .NET Media Bridge) +# ========================================================================= + +@router.post("/{instanceId}/bridge/status") +async def bridgeStatusCallback( + instanceId: str, + statusData: dict, +): + """ + Callback endpoint for the .NET Media Bridge to report status updates. + Called when: bot joins/leaves meeting, errors occur, etc. + Note: No user auth -- authenticated via bridge API key. + """ + sessionId = statusData.get("sessionId") + status = statusData.get("status") + errorMessage = statusData.get("errorMessage") + + if not sessionId or not status: + raise HTTPException(status_code=400, detail="Missing sessionId or status") + + # TODO: Validate bridge API key from headers + + logger.info(f"Bridge status callback: session={sessionId}, status={status}") + + # Update session status + from modules.datamodels.datamodelUam import User + systemUser = User(id="system", email="system@internal") + interface = interfaceDb.getInterface(systemUser, featureInstanceId=instanceId) + + updates = {"status": status} + if errorMessage: + updates["errorMessage"] = errorMessage + if status == TeamsbotSessionStatus.ACTIVE.value: + from modules.shared.timeUtils import getUtcTimestamp + updates["startedAt"] = getUtcTimestamp() + elif status in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]: + from modules.shared.timeUtils import getUtcTimestamp + updates["endedAt"] = getUtcTimestamp() + + interface.updateSession(sessionId, updates) + + # Emit SSE event + from .service import _emitSessionEvent + await _emitSessionEvent(sessionId, "statusChange", {"status": status, "errorMessage": errorMessage}) + + return {"received": True} + + +@router.websocket("/{instanceId}/bridge/audio/{sessionId}") +async def bridgeAudioWebsocket( + websocket: WebSocket, + instanceId: str, + sessionId: str, +): + """ + Bidirectional WebSocket for audio streaming with the .NET Media Bridge. + Bridge sends: PCM audio frames (LINEAR16, 16kHz, mono) + Gateway sends: TTS audio responses + """ + await websocket.accept() + logger.info(f"Bridge audio WebSocket connected: session={sessionId}") + + # TODO: Validate bridge API key from headers/query params + + config = _getInstanceConfig(instanceId) + + from modules.datamodels.datamodelUam import User + systemUser = User(id="system", email="system@internal") + service = TeamsbotService(systemUser, None, instanceId, config) + + try: + await service.handleAudioStream(websocket, sessionId) + except WebSocketDisconnect: + logger.info(f"Bridge audio WebSocket disconnected: session={sessionId}") + except Exception as e: + logger.error(f"Bridge audio WebSocket error: session={sessionId}, error={e}") + finally: + logger.info(f"Bridge audio WebSocket closed: session={sessionId}") diff --git a/modules/features/teamsbot/service.py b/modules/features/teamsbot/service.py new file mode 100644 index 00000000..d7707e05 --- /dev/null +++ b/modules/features/teamsbot/service.py @@ -0,0 +1,549 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Teamsbot Service - Pipeline Orchestrator. +Manages the audio processing pipeline: STT -> Context Buffer -> SPEECH_TEAMS -> TTS -> Bridge. +""" + +import logging +import json +import asyncio +import time +import base64 +from typing import Optional, Dict, Any, List + +from fastapi import WebSocket + +from modules.datamodels.datamodelUam import User +from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum +from modules.shared.timeUtils import getUtcTimestamp + +from .datamodelTeamsbot import ( + TeamsbotSessionStatus, + TeamsbotTranscript, + TeamsbotBotResponse, + TeamsbotResponseType, + TeamsbotConfig, + TeamsbotResponseMode, + SpeechTeamsResponse, +) +from .bridgeConnector import BridgeConnector + +logger = logging.getLogger(__name__) + +# ========================================================================= +# Session Event Queues (for SSE streaming to frontend) +# ========================================================================= +_sessionEvents: Dict[str, asyncio.Queue] = {} + + +async def _emitSessionEvent(sessionId: str, eventType: str, data: Any): + """Emit an event to the session's SSE stream.""" + eventQueue = _sessionEvents.get(sessionId) + if eventQueue: + await eventQueue.put({"type": eventType, "data": data, "timestamp": getUtcTimestamp()}) + + +class TeamsbotService: + """ + Pipeline Orchestrator for Teams Bot sessions. + Coordinates VoiceObjects (STT/TTS), AiService (SPEECH_TEAMS), and Bridge communication. + """ + + def __init__(self, currentUser: User, mandateId: str, instanceId: str, config: TeamsbotConfig): + self.currentUser = currentUser + self.mandateId = mandateId + self.instanceId = instanceId + self.config = config + self.bridgeConnector = BridgeConnector(config.bridgeUrl) + + # State + self._lastAiCallTime: float = 0.0 + self._contextBuffer: List[Dict[str, Any]] = [] + + # ========================================================================= + # Session Lifecycle + # ========================================================================= + + async def joinMeeting( + self, + sessionId: str, + meetingLink: str, + connectionId: Optional[str] = None, + gatewayBaseUrl: str = "", + ): + """Send join command to the .NET Media Bridge. + + Args: + gatewayBaseUrl: Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net). + Passed to the bridge so it can build full callback/WS URLs per-session. + """ + from . import interfaceFeatureTeamsbot as interfaceDb + + interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) + + # Initialize SSE event queue + _sessionEvents[sessionId] = asyncio.Queue() + + try: + # Update status to JOINING + interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.JOINING.value}) + await _emitSessionEvent(sessionId, "statusChange", {"status": "joining"}) + + # Send join command to bridge + session = interface.getSession(sessionId) + if not session: + raise ValueError(f"Session {sessionId} not found") + + result = await self.bridgeConnector.joinMeeting( + meetingLink=meetingLink, + botName=session.get("botName", self.config.botName), + backgroundImageUrl=session.get("backgroundImageUrl"), + sessionId=sessionId, + gatewayCallbackUrl=f"/api/teamsbot/{self.instanceId}/bridge/status", + gatewayWsUrl=f"/api/teamsbot/{self.instanceId}/bridge/audio/{sessionId}", + gatewayBaseUrl=gatewayBaseUrl, + ) + + if result.get("success"): + bridgeSessionId = result.get("bridgeSessionId") + interface.updateSession(sessionId, { + "bridgeSessionId": bridgeSessionId, + "status": TeamsbotSessionStatus.ACTIVE.value, + "startedAt": getUtcTimestamp(), + }) + await _emitSessionEvent(sessionId, "statusChange", {"status": "active"}) + logger.info(f"Bot joined meeting for session {sessionId}") + else: + errorMsg = result.get("error", "Unknown error joining meeting") + interface.updateSession(sessionId, { + "status": TeamsbotSessionStatus.ERROR.value, + "errorMessage": errorMsg, + }) + await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": errorMsg}) + logger.error(f"Failed to join meeting for session {sessionId}: {errorMsg}") + + except Exception as e: + logger.error(f"Error joining meeting for session {sessionId}: {e}") + interface.updateSession(sessionId, { + "status": TeamsbotSessionStatus.ERROR.value, + "errorMessage": str(e), + }) + await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": str(e)}) + + async def leaveMeeting(self, sessionId: str): + """Send leave command to the .NET Media Bridge.""" + from . import interfaceFeatureTeamsbot as interfaceDb + + interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) + + try: + interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.LEAVING.value}) + await _emitSessionEvent(sessionId, "statusChange", {"status": "leaving"}) + + await self.bridgeConnector.leaveMeeting(sessionId) + + interface.updateSession(sessionId, { + "status": TeamsbotSessionStatus.ENDED.value, + "endedAt": getUtcTimestamp(), + }) + await _emitSessionEvent(sessionId, "statusChange", {"status": "ended"}) + + # Generate meeting summary in background + asyncio.create_task(self._generateMeetingSummary(sessionId)) + + logger.info(f"Bot left meeting for session {sessionId}") + + except Exception as e: + logger.error(f"Error leaving meeting for session {sessionId}: {e}") + interface.updateSession(sessionId, { + "status": TeamsbotSessionStatus.ERROR.value, + "errorMessage": str(e), + "endedAt": getUtcTimestamp(), + }) + + # Cleanup event queue + _sessionEvents.pop(sessionId, None) + + # ========================================================================= + # Audio Processing Pipeline + # ========================================================================= + + async def handleAudioStream(self, websocket: WebSocket, sessionId: str): + """ + Main audio processing loop. + Receives PCM audio from bridge via WebSocket, processes through STT -> AI -> TTS pipeline. + """ + from . import interfaceFeatureTeamsbot as interfaceDb + from modules.interfaces.interfaceVoiceObjects import getVoiceInterface + + interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) + voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) + + audioBuffer = bytearray() + bufferDurationMs = 0 + targetBufferMs = 1500 # Buffer 1.5 seconds of audio before STT + + # PCM16 at 16kHz mono = 32000 bytes/second + bytesPerSecond = 32000 + bytesPerMs = bytesPerSecond / 1000 + + logger.info(f"Audio processing started for session {sessionId}") + + try: + while True: + # Receive audio data from bridge + data = await websocket.receive() + + if "bytes" in data: + audioChunk = data["bytes"] + elif "text" in data: + # JSON message (control messages) + message = json.loads(data["text"]) + msgType = message.get("type") + + if msgType == "audio_chunk": + audioChunk = base64.b64decode(message.get("data", "")) + elif msgType == "session_ended": + logger.info(f"Bridge signaled session ended: {sessionId}") + break + elif msgType == "ping": + await websocket.send_text(json.dumps({"type": "pong"})) + continue + else: + continue + else: + continue + + # Accumulate audio in buffer + audioBuffer.extend(audioChunk) + bufferDurationMs = len(audioBuffer) / bytesPerMs + + # Process when buffer has enough audio + if bufferDurationMs >= targetBufferMs: + await self._processAudioBuffer( + bytes(audioBuffer), + sessionId, + interface, + voiceInterface, + websocket, + ) + audioBuffer.clear() + bufferDurationMs = 0 + + except Exception as e: + if "disconnect" not in str(e).lower(): + logger.error(f"Audio stream error for session {sessionId}: {e}") + + # Process remaining buffer + if len(audioBuffer) > 0: + await self._processAudioBuffer( + bytes(audioBuffer), + sessionId, + interface, + voiceInterface, + websocket, + ) + + logger.info(f"Audio processing ended for session {sessionId}") + + async def _processAudioBuffer( + self, + audioBytes: bytes, + sessionId: str, + interface, + voiceInterface, + websocket: WebSocket, + ): + """Process a buffered audio chunk through the STT -> AI -> TTS pipeline.""" + + # Step 1: STT -- convert audio to text + try: + sttResult = voiceInterface.speechToText( + audioContent=audioBytes, + language=self.config.language, + sampleRate=16000, + channels=1 + ) + except Exception as e: + logger.warning(f"STT failed for session {sessionId}: {e}") + return + + transcriptText = sttResult.get("text", "").strip() if isinstance(sttResult, dict) else str(sttResult).strip() + if not transcriptText: + return + + confidence = sttResult.get("confidence", 0.0) if isinstance(sttResult, dict) else 0.0 + speaker = sttResult.get("speaker") if isinstance(sttResult, dict) else None + + # Store transcript segment + transcriptData = TeamsbotTranscript( + sessionId=sessionId, + speaker=speaker, + text=transcriptText, + timestamp=getUtcTimestamp(), + confidence=confidence, + language=self.config.language, + isFinal=True, + ).model_dump() + + createdTranscript = interface.createTranscript(transcriptData) + + # Update context buffer + self._contextBuffer.append({ + "speaker": speaker or "Unknown", + "text": transcriptText, + "timestamp": getUtcTimestamp(), + }) + # Keep only last N segments + maxSegments = self.config.contextWindowSegments + if len(self._contextBuffer) > maxSegments: + self._contextBuffer = self._contextBuffer[-maxSegments:] + + # Emit SSE event for live transcript + await _emitSessionEvent(sessionId, "transcript", { + "id": createdTranscript.get("id"), + "speaker": speaker, + "text": transcriptText, + "confidence": confidence, + "timestamp": getUtcTimestamp(), + }) + + # Update session transcript count + session = interface.getSession(sessionId) + if session: + count = session.get("transcriptSegmentCount", 0) + 1 + interface.updateSession(sessionId, {"transcriptSegmentCount": count}) + + # Step 2: Check if AI analysis should be triggered + if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY: + return + + if not self._shouldTriggerAnalysis(transcriptText): + return + + # Step 3: SPEECH_TEAMS AI analysis + await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript) + + def _shouldTriggerAnalysis(self, transcriptText: str) -> bool: + """ + Decide whether to trigger AI analysis based on the latest transcript. + Triggers: + - Bot name mentioned (immediate) + - Periodic interval elapsed + - Cooldown respected + """ + now = time.time() + + # Cooldown check + timeSinceLastCall = now - self._lastAiCallTime + if timeSinceLastCall < self.config.triggerCooldownSeconds: + return False + + # Bot name mentioned -> immediate trigger + botNameLower = self.config.botName.lower() + if botNameLower in transcriptText.lower(): + logger.debug(f"Trigger: Bot name '{self.config.botName}' detected in transcript") + return True + + # Periodic trigger + if timeSinceLastCall >= self.config.triggerIntervalSeconds: + logger.debug(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed") + return True + + return False + + async def _analyzeAndRespond( + self, + sessionId: str, + interface, + voiceInterface, + websocket: WebSocket, + triggerTranscript: Dict[str, Any], + ): + """Run SPEECH_TEAMS AI analysis and respond if needed.""" + self._lastAiCallTime = time.time() + + # Build transcript context from buffer + contextLines = [] + for segment in self._contextBuffer: + speaker = segment.get("speaker", "Unknown") + text = segment.get("text", "") + contextLines.append(f"[{speaker}]: {text}") + + transcriptContext = f"BOT_NAME:{self.config.botName}\n" + "\n".join(contextLines) + + # Call SPEECH_TEAMS + try: + from modules.services.serviceAi.mainServiceAi import AiService + + # Create AiService with service center context + # Note: In production, serviceCenter should be passed properly + aiService = AiService(serviceCenter=None) + await aiService.ensureAiObjectsInitialized() + + request = AiCallRequest( + prompt=self.config.aiSystemPrompt, + context=transcriptContext, + options=AiCallOptions( + operationType=OperationTypeEnum.SPEECH_TEAMS, + priority=PriorityEnum.SPEED, + ) + ) + + response = await aiService.callAi(request) + + # Parse structured response + try: + speechResult = SpeechTeamsResponse.model_validate_json(response.content) + except Exception: + # Try to extract JSON from response content + try: + jsonStr = response.content + if "```json" in jsonStr: + jsonStr = jsonStr.split("```json")[1].split("```")[0] + elif "```" in jsonStr: + jsonStr = jsonStr.split("```")[1].split("```")[0] + speechResult = SpeechTeamsResponse.model_validate_json(jsonStr.strip()) + except Exception as parseErr: + logger.warning(f"Failed to parse SPEECH_TEAMS response: {parseErr}") + speechResult = SpeechTeamsResponse( + shouldRespond=False, + reasoning=f"Parse error: {str(parseErr)[:100]}", + detectedIntent="none" + ) + + logger.info( + f"SPEECH_TEAMS result: shouldRespond={speechResult.shouldRespond}, " + f"intent={speechResult.detectedIntent}, " + f"reasoning={speechResult.reasoning[:80]}..." + ) + + # Emit analysis event (always, for debug/UI) + await _emitSessionEvent(sessionId, "analysis", { + "shouldRespond": speechResult.shouldRespond, + "detectedIntent": speechResult.detectedIntent, + "reasoning": speechResult.reasoning, + "modelName": response.modelName, + "processingTime": response.processingTime, + "priceCHF": response.priceCHF, + }) + + # Step 4: Respond if AI decided to + if speechResult.shouldRespond and speechResult.responseText: + + if self.config.responseMode == TeamsbotResponseMode.MANUAL: + # In manual mode, suggest but don't send + await _emitSessionEvent(sessionId, "suggestedResponse", { + "responseText": speechResult.responseText, + "detectedIntent": speechResult.detectedIntent, + "reasoning": speechResult.reasoning, + }) + return + + # Auto mode: send voice + chat response + responseType = TeamsbotResponseType.BOTH + + # 4a: TTS -> Audio to bridge + try: + ttsResult = voiceInterface.textToSpeech( + text=speechResult.responseText, + languageCode=self.config.language, + voiceName=self.config.voiceId + ) + + if ttsResult and isinstance(ttsResult, dict): + audioContent = ttsResult.get("audioContent") + if audioContent: + # Send TTS audio to bridge + await websocket.send_text(json.dumps({ + "type": "tts_audio", + "data": base64.b64encode(audioContent if isinstance(audioContent, bytes) else audioContent.encode()).decode(), + "format": "mp3", + })) + except Exception as ttsErr: + logger.warning(f"TTS failed for session {sessionId}: {ttsErr}") + responseType = TeamsbotResponseType.CHAT # Fallback to chat only + + # 4b: Store bot response + botResponseData = TeamsbotBotResponse( + sessionId=sessionId, + responseText=speechResult.responseText, + responseType=responseType, + detectedIntent=speechResult.detectedIntent, + reasoning=speechResult.reasoning, + triggeredByTranscriptId=triggerTranscript.get("id"), + modelName=response.modelName, + processingTime=response.processingTime, + priceCHF=response.priceCHF, + timestamp=getUtcTimestamp(), + ).model_dump() + + createdResponse = interface.createBotResponse(botResponseData) + + # 4c: Emit SSE event + await _emitSessionEvent(sessionId, "botResponse", { + "id": createdResponse.get("id"), + "responseText": speechResult.responseText, + "responseType": responseType.value, + "detectedIntent": speechResult.detectedIntent, + "reasoning": speechResult.reasoning, + "modelName": response.modelName, + "processingTime": response.processingTime, + "priceCHF": response.priceCHF, + }) + + # Update session response count + session = interface.getSession(sessionId) + if session: + count = session.get("botResponseCount", 0) + 1 + interface.updateSession(sessionId, {"botResponseCount": count}) + + logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}") + + except Exception as e: + logger.error(f"SPEECH_TEAMS analysis failed for session {sessionId}: {e}") + await _emitSessionEvent(sessionId, "error", {"message": f"AI analysis failed: {str(e)}"}) + + # ========================================================================= + # Meeting Summary + # ========================================================================= + + async def _generateMeetingSummary(self, sessionId: str): + """Generate an AI summary of the meeting after it ends.""" + try: + from . import interfaceFeatureTeamsbot as interfaceDb + + interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) + transcripts = interface.getTranscripts(sessionId) + + if not transcripts or len(transcripts) < 5: + return # Not enough content for a summary + + # Build full transcript + fullTranscript = "\n".join( + f"[{t.get('speaker', 'Unknown')}]: {t.get('text', '')}" + for t in transcripts + ) + + from modules.services.serviceAi.mainServiceAi import AiService + + aiService = AiService(serviceCenter=None) + await aiService.ensureAiObjectsInitialized() + + request = AiCallRequest( + prompt="Erstelle eine kurze Zusammenfassung dieses Meeting-Transkripts. Nenne die wichtigsten Punkte, Entscheidungen und offene Aktionspunkte.", + context=fullTranscript, + options=AiCallOptions( + operationType=OperationTypeEnum.DATA_ANALYSE, + priority=PriorityEnum.BALANCED, + ) + ) + + response = await aiService.callAi(request) + if response and response.errorCount == 0: + interface.updateSession(sessionId, {"summary": response.content}) + logger.info(f"Meeting summary generated for session {sessionId}") + + except Exception as e: + logger.error(f"Failed to generate meeting summary for session {sessionId}: {e}") diff --git a/modules/routes/routeSecurityMsft.py b/modules/routes/routeSecurityMsft.py index 338e2e33..face29d4 100644 --- a/modules/routes/routeSecurityMsft.py +++ b/modules/routes/routeSecurityMsft.py @@ -58,9 +58,13 @@ SCOPES = [ "Mail.ReadWrite", # Read and write mail "Mail.Send", # Send mail "Files.ReadWrite.All", # Read and write files (SharePoint/OneDrive) - "Sites.ReadWrite.All" # Read and write SharePoint sites + "Sites.ReadWrite.All", # Read and write SharePoint sites + # Teams Bot: Meeting and chat access (requires admin consent) + "OnlineMeetings.Read.All", # Read Teams meeting details + "Chat.ReadWrite", # Read and write Teams chat messages + "ChatMessage.Send", # Send messages to Teams meeting chats ] -# NOTE: Sites.ReadWrite.All and Files.ReadWrite.All require admin consent. +# NOTE: Sites.ReadWrite.All, Files.ReadWrite.All, and Teams scopes require admin consent. # An admin must grant consent ONCE at: /api/msft/adminconsent # After that, all users can connect without individual admin approval. diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index bd3ef6b4..1247ab70 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -95,6 +95,10 @@ class AiService: 3. billingCallback on aiObjects: records one billing transaction per model call with exact provider + model name (set before AI call, invoked by _callWithModel) """ + # SPEECH_TEAMS: Dedicated pipeline, bypasses standard model selection + if request.options and request.options.operationType == OperationTypeEnum.SPEECH_TEAMS: + return await self._handleSpeechTeams(request) + # FAIL-SAFE: Pre-flight billing validation (like 0 CHF credit card check) self._preflightBillingCheck() @@ -127,6 +131,243 @@ class AiService: self._storeAiCallStats(response, request) return response + + # ========================================================================= + # SPEECH_TEAMS: Dedicated handler for Teams Meeting AI analysis + # Bypasses standard model selection. Uses a fixed fast model. + # ========================================================================= + + async def _handleSpeechTeams(self, request: AiCallRequest): + """ + Dedicated handler for SPEECH_TEAMS operation type. + Bypasses standard model selection and uses a fixed fast model optimized + for low-latency meeting transcript analysis. + + The handler: + 1. Selects a fixed fast model (no model selector) + 2. Builds a specialized system prompt for meeting analysis + 3. Calls the model with structured JSON output + 4. Returns a SpeechTeamsResponse wrapped in AiCallResponse + + Args: + request: AiCallRequest with: + - prompt: User-configured system prompt (from FeatureInstance.config.aiSystemPrompt) + - context: Accumulated transcript segments to analyze + - options.metadata: Optional dict with "botName" key + + Returns: + AiCallResponse with content as JSON string (SpeechTeamsResponse format) + """ + from modules.datamodels.datamodelAi import AiCallResponse, AiModelCall, AiCallOptions, PriorityEnum + + startTime = time.time() + + # Billing pre-flight (SPEECH_TEAMS also needs billing) + self._preflightBillingCheck() + await self._checkBillingBeforeAiCall() + + # 1. Select a fixed fast model (bypass model selector) + model = self._getSpeechTeamsModel() + if not model: + return AiCallResponse( + content=json.dumps({"shouldRespond": False, "responseText": None, "reasoning": "No suitable model available for SPEECH_TEAMS", "detectedIntent": "none"}), + modelName="error", + provider="unknown", + priceCHF=0.0, + processingTime=0.0, + bytesSent=0, + bytesReceived=0, + errorCount=1 + ) + + # 2. Build specialized system prompt + metadata = {} + if hasattr(request.options, 'allowedProviders') and request.options.allowedProviders: + # Reuse allowedProviders field as metadata carrier if set (backward compat) + pass + botName = metadata.get("botName", "AI Assistant") + + # Extract botName from context if embedded as header + contextText = request.context or "" + if contextText.startswith("BOT_NAME:"): + lines = contextText.split("\n", 1) + botName = lines[0].replace("BOT_NAME:", "").strip() + contextText = lines[1] if len(lines) > 1 else "" + + userSystemPrompt = request.prompt or "" + systemPrompt = self._buildSpeechTeamsSystemPrompt(userSystemPrompt, botName) + + # 3. Build messages + messages = [ + {"role": "system", "content": systemPrompt}, + {"role": "user", "content": contextText} + ] + + # 4. Call model directly (no failover loop -- single fast model) + modelCall = AiModelCall( + messages=messages, + model=model, + options=AiCallOptions( + operationType=OperationTypeEnum.SPEECH_TEAMS, + priority=PriorityEnum.SPEED, + temperature=0.3, + resultFormat="json" + ) + ) + + # Set billing callback + self.aiObjects.billingCallback = self._createBillingCallback() + + try: + inputBytes = len((systemPrompt + contextText).encode('utf-8')) + modelResponse = await model.functionCall(modelCall) + + if not modelResponse.success: + raise ValueError(f"SPEECH_TEAMS model call failed: {modelResponse.error}") + + content = modelResponse.content + outputBytes = len(content.encode('utf-8')) + processingTime = time.time() - startTime + priceCHF = model.calculatepriceCHF(processingTime, inputBytes, outputBytes) + + response = AiCallResponse( + content=content, + modelName=model.name, + provider=model.connectorType, + priceCHF=priceCHF, + processingTime=processingTime, + bytesSent=inputBytes, + bytesReceived=outputBytes, + errorCount=0 + ) + + # Record billing + if self.aiObjects.billingCallback: + try: + self.aiObjects.billingCallback(response) + except Exception as e: + logger.error(f"BILLING: Failed to record billing for SPEECH_TEAMS: {e}") + + # Store stats + self._storeAiCallStats(response, request) + + logger.info(f"SPEECH_TEAMS call completed: model={model.name}, time={processingTime:.2f}s, cost={priceCHF:.4f} CHF") + return response + + except Exception as e: + processingTime = time.time() - startTime + logger.error(f"SPEECH_TEAMS call failed: {e}") + return AiCallResponse( + content=json.dumps({"shouldRespond": False, "responseText": None, "reasoning": f"Error: {str(e)}", "detectedIntent": "none"}), + modelName=model.name if model else "error", + provider=model.connectorType if model else "unknown", + priceCHF=0.0, + processingTime=processingTime, + bytesSent=0, + bytesReceived=0, + errorCount=1 + ) + finally: + self.aiObjects.billingCallback = None + + def _getSpeechTeamsModel(self): + """ + Get the fixed fast model for SPEECH_TEAMS. + Prioritizes: GPT-4o-mini > Claude Haiku > any fast model with DATA_ANALYSE capability. + Returns the AiModel instance or None. + """ + from modules.aicore.aicoreModelRegistry import modelRegistry + + availableModels = modelRegistry.getAvailableModels() + if not availableModels: + logger.error("SPEECH_TEAMS: No models available in registry") + return None + + # Priority list of preferred models for SPEECH_TEAMS (fast + cheap) + _preferredModelNames = [ + "gpt-4o-mini", # OpenAI: fast, cheap, good at JSON + "claude-3-5-haiku", # Anthropic: fast, cheap + "gpt-4o", # OpenAI: fallback to quality model + "claude-sonnet-4-5", # Anthropic: fallback + ] + + # Try preferred models in order + for preferredName in _preferredModelNames: + for model in availableModels: + if preferredName in model.name.lower() and model.functionCall and model.isAvailable: + logger.info(f"SPEECH_TEAMS: Selected preferred model '{model.name}' ({model.displayName})") + return model + + # Fallback: pick fastest available model with DATA_ANALYSE capability + _dataAnalyseModels = [] + for model in availableModels: + if not model.functionCall or not model.isAvailable: + continue + for opRating in model.operationTypes: + if opRating.operationType == OperationTypeEnum.DATA_ANALYSE: + _dataAnalyseModels.append((model, opRating.rating)) + break + + if _dataAnalyseModels: + # Sort by speed rating (descending) then cost (ascending) + _dataAnalyseModels.sort(key=lambda x: (-x[0].speedRating, x[0].costPer1kTokensInput)) + bestModel = _dataAnalyseModels[0][0] + logger.info(f"SPEECH_TEAMS: Selected fallback model '{bestModel.name}' (speed={bestModel.speedRating})") + return bestModel + + # Last resort: first available model + for model in availableModels: + if model.functionCall and model.isAvailable: + logger.warning(f"SPEECH_TEAMS: Using last-resort model '{model.name}'") + return model + + return None + + def _buildSpeechTeamsSystemPrompt(self, userSystemPrompt: str, botName: str) -> str: + """ + Build the specialized system prompt for SPEECH_TEAMS meeting analysis. + Combines a fixed base prompt with user-configurable instructions. + """ + basePrompt = f"""Du bist "{botName}", ein AI-Teilnehmer in einem Microsoft Teams Meeting. +Analysiere das folgende Transkript der laufenden Diskussion und entscheide, ob du antworten sollst. + +ANTWORTE NUR wenn: +1. Du direkt angesprochen wirst (z.B. "{botName}, was denkst du?" oder "Hey {botName}") +2. Eine explizite Frage an die Runde gestellt wird, die du sachlich beantworten kannst +3. Du einen wertvollen, nicht-offensichtlichen Beitrag zur Diskussion hast + +ANTWORTE NICHT wenn: +- Die Teilnehmer normal miteinander sprechen ohne dich einzubeziehen +- Die Diskussion keinen Input von dir benoetigt +- Du nur wiederholen wuerdest, was bereits gesagt wurde +- Es sich um Smalltalk oder Begruessung handelt + +ANTWORT-STIL: +- Halte dich kurz und praezise (max 2-3 Saetze fuer Sprache) +- Sei professionell aber natuerlich +- Beziehe dich konkret auf das Gesagte""" + + # Append user-configured instructions if provided + if userSystemPrompt and userSystemPrompt.strip(): + basePrompt += f"\n\nZUSAETZLICHE ANWEISUNGEN:\n{userSystemPrompt.strip()}" + + basePrompt += f""" + +WICHTIG: Antworte IMMER als valides JSON in exakt diesem Format: +{{ + "shouldRespond": true/false, + "responseText": "Deine Antwort hier" oder null, + "reasoning": "Kurze Begruendung deiner Entscheidung", + "detectedIntent": "addressed" | "question" | "proactive" | "none" +}} + +detectedIntent-Werte: +- "addressed": {botName} wurde direkt angesprochen +- "question": Eine allgemeine Frage wurde gestellt +- "proactive": Du hast einen wertvollen proaktiven Beitrag +- "none": Kein Handlungsbedarf""" + + return basePrompt def _preflightBillingCheck(self) -> None: """