# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Teamsbot routes for the backend API. Implements Teams Bot session management, live streaming, and configuration endpoints. """ import logging import json import asyncio from typing import Optional from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, Request from fastapi.responses import StreamingResponse # Import auth modules from modules.auth import limiter, getRequestContext, RequestContext # Import interfaces from . import interfaceFeatureTeamsbot as interfaceDb from modules.interfaces.interfaceDbApp import getRootInterface from modules.interfaces.interfaceFeatures import getFeatureInterface # Import models from .datamodelTeamsbot import ( TeamsbotSession, TeamsbotSessionStatus, TeamsbotStartSessionRequest, TeamsbotSessionResponse, TeamsbotConfigUpdateRequest, TeamsbotConfig, ) # Import service from .service import TeamsbotService logger = logging.getLogger(__name__) # Create router router = APIRouter( prefix="/api/teamsbot", tags=["Teamsbot"], responses={404: {"description": "Not found"}} ) # ========================================================================= # Helpers # ========================================================================= def _getInterface(context: RequestContext, instanceId: Optional[str] = None): """Get teamsbot interface with instance context.""" mandateId = str(context.mandateId) if context.mandateId else None return interfaceDb.getInterface( context.user, mandateId=mandateId, featureInstanceId=instanceId ) def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str: """Validate user access to the feature instance. Returns mandateId.""" rootInterface = getRootInterface() featureInterface = getFeatureInterface(rootInterface.db) instance = featureInterface.getFeatureInstance(instanceId) if not instance: raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found") mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None) if not mandateId: raise HTTPException(status_code=500, detail="Feature instance has no mandateId") return str(mandateId) def _getInstanceConfig(instanceId: str) -> TeamsbotConfig: """Load TeamsbotConfig from FeatureInstance.config JSONB field.""" rootInterface = getRootInterface() featureInterface = getFeatureInterface(rootInterface.db) instance = featureInterface.getFeatureInstance(instanceId) if not instance: return TeamsbotConfig() configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None) if configDict and isinstance(configDict, dict): try: return TeamsbotConfig(**configDict) except Exception: pass return TeamsbotConfig() # ========================================================================= # Session Endpoints # ========================================================================= @router.post("/{instanceId}/sessions") @limiter.limit("10/minute") async def startSession( request: Request, instanceId: str, body: TeamsbotStartSessionRequest, context: RequestContext = Depends(getRequestContext), ): """Start a new Teams Bot session -- bot joins the specified meeting.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) config = _getInstanceConfig(instanceId) # Create session sessionData = TeamsbotSession( instanceId=instanceId, mandateId=mandateId, meetingLink=body.meetingLink, botName=body.botName or config.botName, backgroundImageUrl=body.backgroundImageUrl or config.backgroundImageUrl, status=TeamsbotSessionStatus.PENDING, startedByUserId=str(context.user.id), ).model_dump() createdSession = interface.createSession(sessionData) sessionId = createdSession.get("id") # Derive gateway base URL from the incoming request so the bridge # can build full callback/WS URLs targeting this specific gateway instance. gatewayBaseUrl = str(request.base_url).rstrip("/") # Start the bot in background (join meeting via bridge) service = TeamsbotService(context.user, mandateId, instanceId, config) asyncio.create_task( service.joinMeeting(sessionId, body.meetingLink, body.connectionId, gatewayBaseUrl) ) logger.info(f"Teamsbot session {sessionId} created for instance {instanceId}") return {"session": createdSession} @router.get("/{instanceId}/sessions") @limiter.limit("30/minute") async def listSessions( request: Request, instanceId: str, includeEnded: bool = Query(True, description="Include ended sessions"), context: RequestContext = Depends(getRequestContext), ): """List all sessions for a feature instance.""" _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) sessions = interface.getSessions(instanceId, includeEnded=includeEnded) return {"sessions": sessions} @router.get("/{instanceId}/sessions/{sessionId}") @limiter.limit("30/minute") async def getSession( request: Request, instanceId: str, sessionId: str, includeTranscripts: bool = Query(True), includeResponses: bool = Query(True), context: RequestContext = Depends(getRequestContext), ): """Get session details with optional transcripts and bot responses.""" _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") result = {"session": session} if includeTranscripts: result["transcripts"] = interface.getTranscripts(sessionId) if includeResponses: result["botResponses"] = interface.getBotResponses(sessionId) result["stats"] = interface.getSessionStats(sessionId) return result @router.get("/{instanceId}/sessions/{sessionId}/stream") @limiter.limit("10/minute") async def streamSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): """ SSE live stream for a session. Streams transcript segments and bot responses in real-time. Events: transcript, botResponse, statusChange, error """ _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") async def _eventGenerator(): """Generate SSE events from the session event queue.""" from .service import _sessionEvents # Send initial session state yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n" # Stream events eventQueue = _sessionEvents.get(sessionId) if not eventQueue: _sessionEvents[sessionId] = asyncio.Queue() eventQueue = _sessionEvents[sessionId] try: while True: try: event = await asyncio.wait_for(eventQueue.get(), timeout=30.0) yield f"data: {json.dumps(event)}\n\n" # Stop streaming if session ended if event.get("type") == "statusChange" and event.get("data", {}).get("status") in ["ended", "error"]: break except asyncio.TimeoutError: # Send keepalive ping yield f"data: {json.dumps({'type': 'ping'})}\n\n" except asyncio.CancelledError: pass return StreamingResponse( _eventGenerator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", } ) @router.post("/{instanceId}/sessions/{sessionId}/stop") @limiter.limit("10/minute") async def stopSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): """Stop an active session -- bot leaves the meeting.""" mandateId = _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) config = _getInstanceConfig(instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") currentStatus = session.get("status") if currentStatus in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]: raise HTTPException(status_code=400, detail=f"Session already in terminal state: {currentStatus}") # Stop the bot service = TeamsbotService(context.user, mandateId, instanceId, config) await service.leaveMeeting(sessionId) logger.info(f"Teamsbot session {sessionId} stop requested") return {"status": "stopping", "sessionId": sessionId} @router.delete("/{instanceId}/sessions/{sessionId}") @limiter.limit("10/minute") async def deleteSession( request: Request, instanceId: str, sessionId: str, context: RequestContext = Depends(getRequestContext), ): """Delete a session and all related data.""" _validateInstanceAccess(instanceId, context) interface = _getInterface(context, instanceId) session = interface.getSession(sessionId) if not session: raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found") # Don't delete active sessions currentStatus = session.get("status") if currentStatus in [TeamsbotSessionStatus.ACTIVE.value, TeamsbotSessionStatus.JOINING.value]: raise HTTPException(status_code=400, detail="Cannot delete an active session. Stop it first.") interface.deleteSession(sessionId) logger.info(f"Teamsbot session {sessionId} deleted") return {"deleted": True, "sessionId": sessionId} # ========================================================================= # Configuration Endpoints # ========================================================================= @router.get("/{instanceId}/config") @limiter.limit("30/minute") async def getConfig( request: Request, instanceId: str, context: RequestContext = Depends(getRequestContext), ): """Get the teamsbot configuration for a feature instance.""" _validateInstanceAccess(instanceId, context) config = _getInstanceConfig(instanceId) return {"config": config.model_dump()} @router.put("/{instanceId}/config") @limiter.limit("10/minute") async def updateConfig( request: Request, instanceId: str, configUpdate: TeamsbotConfigUpdateRequest, context: RequestContext = Depends(getRequestContext), ): """Update the teamsbot configuration.""" mandateId = _validateInstanceAccess(instanceId, context) # Load current config and merge updates currentConfig = _getInstanceConfig(instanceId) updateDict = configUpdate.model_dump(exclude_none=True) mergedConfig = currentConfig.model_copy(update=updateDict) # Save to FeatureInstance.config rootInterface = getRootInterface() featureInterface = getFeatureInterface(rootInterface.db) featureInterface.updateFeatureInstanceConfig(instanceId, mergedConfig.model_dump()) logger.info(f"Teamsbot config updated for instance {instanceId}: {list(updateDict.keys())}") return {"config": mergedConfig.model_dump()} # ========================================================================= # Browser Bot Communication Endpoints # ========================================================================= @router.websocket("/{instanceId}/bot/ws/{sessionId}") async def botWebsocket( websocket: WebSocket, instanceId: str, sessionId: str, ): """ Bidirectional WebSocket for communication with the Browser Bot. Bot sends: - transcript: Caption text scraped from Teams meeting - status: Bot state changes (joined, in_lobby, left, error) Gateway sends: - playAudio: TTS audio for the bot to play in the meeting """ await websocket.accept() logger.info(f"Browser Bot WebSocket connected: session={sessionId}, instance={instanceId}") # TODO: Validate bot API key from headers/query params try: config = _getInstanceConfig(instanceId) logger.info(f"Browser Bot WebSocket config loaded: session={sessionId}") # Load the original user who started the session (has RBAC roles in mandate) # Bot callbacks have no HTTP auth, so we reconstruct the user context from the session record. from modules.datamodels.datamodelUam import User from modules.interfaces.interfaceDbApp import getRootInterface systemUser = User(id="system", username="system", email="system@poweron.swiss") sessionInterface = interfaceDb.getInterface(systemUser, featureInstanceId=instanceId) session = sessionInterface.getSession(sessionId) mandateId = session.get("mandateId") if session else None startedByUserId = session.get("startedByUserId") if session else None # Look up the original user (getRootInterface uses admin context, can load any user) rootInterface = getRootInterface() originalUser = rootInterface.getUser(startedByUserId) if startedByUserId else None if not originalUser: logger.warning(f"Could not load original user {startedByUserId}, falling back to system user") originalUser = systemUser service = TeamsbotService(originalUser, mandateId, instanceId, config) logger.info(f"Browser Bot WebSocket service created: session={sessionId}, mandateId={mandateId}, user={originalUser.id}") await service.handleBotWebSocket(websocket, sessionId) except WebSocketDisconnect: logger.info(f"Browser Bot WebSocket disconnected: session={sessionId}") except Exception as e: logger.error(f"Browser Bot WebSocket error: session={sessionId}, error={e}", exc_info=True) finally: logger.info(f"Browser Bot WebSocket closed: session={sessionId}")