gateway/modules/features/teamsbot/routeFeatureTeamsbot.py
patrick-motsch b7e4efb3a3 teams bridge
2026-02-13 07:27:33 +01:00

418 lines
15 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot routes for the backend API.
Implements Teams Bot session management, live streaming, and configuration endpoints.
"""
import logging
import json
import asyncio
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, Request
from fastapi.responses import StreamingResponse
# Import auth modules
from modules.auth import limiter, getRequestContext, RequestContext
# Import interfaces
from . import interfaceFeatureTeamsbot as interfaceDb
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
# Import models
from .datamodelTeamsbot import (
TeamsbotSession,
TeamsbotSessionStatus,
TeamsbotStartSessionRequest,
TeamsbotSessionResponse,
TeamsbotConfigUpdateRequest,
TeamsbotConfig,
)
# Import service
from .service import TeamsbotService
logger = logging.getLogger(__name__)
# Create router
router = APIRouter(
prefix="/api/teamsbot",
tags=["Teamsbot"],
responses={404: {"description": "Not found"}}
)
# =========================================================================
# Helpers
# =========================================================================
def _getInterface(context: RequestContext, instanceId: Optional[str] = None):
"""Get teamsbot interface with instance context."""
mandateId = str(context.mandateId) if context.mandateId else None
return interfaceDb.getInterface(
context.user,
mandateId=mandateId,
featureInstanceId=instanceId
)
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
"""Validate user access to the feature instance. Returns mandateId."""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found")
mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None)
if not mandateId:
raise HTTPException(status_code=500, detail="Feature instance has no mandateId")
return str(mandateId)
def _getInstanceConfig(instanceId: str) -> TeamsbotConfig:
"""Load TeamsbotConfig from FeatureInstance.config JSONB field."""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
return TeamsbotConfig()
configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None)
if configDict and isinstance(configDict, dict):
try:
return TeamsbotConfig(**configDict)
except Exception:
pass
return TeamsbotConfig()
# =========================================================================
# Session Endpoints
# =========================================================================
@router.post("/{instanceId}/sessions")
@limiter.limit("10/minute")
async def startSession(
request: Request,
instanceId: str,
body: TeamsbotStartSessionRequest,
context: RequestContext = Depends(getRequestContext),
):
"""Start a new Teams Bot session -- bot joins the specified meeting."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
config = _getInstanceConfig(instanceId)
# Create session
sessionData = TeamsbotSession(
instanceId=instanceId,
mandateId=mandateId,
meetingLink=body.meetingLink,
botName=body.botName or config.botName,
backgroundImageUrl=body.backgroundImageUrl or config.backgroundImageUrl,
status=TeamsbotSessionStatus.PENDING,
startedByUserId=str(context.user.id),
).model_dump()
createdSession = interface.createSession(sessionData)
sessionId = createdSession.get("id")
# Derive gateway base URL from the incoming request so the bridge
# can build full callback/WS URLs targeting this specific gateway instance.
gatewayBaseUrl = str(request.base_url).rstrip("/")
# Start the bot in background (join meeting via bridge)
service = TeamsbotService(context.user, mandateId, instanceId, config)
asyncio.create_task(
service.joinMeeting(sessionId, body.meetingLink, body.connectionId, gatewayBaseUrl)
)
logger.info(f"Teamsbot session {sessionId} created for instance {instanceId}")
return {"session": createdSession}
@router.get("/{instanceId}/sessions")
@limiter.limit("30/minute")
async def listSessions(
request: Request,
instanceId: str,
includeEnded: bool = Query(True, description="Include ended sessions"),
context: RequestContext = Depends(getRequestContext),
):
"""List all sessions for a feature instance."""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
sessions = interface.getSessions(instanceId, includeEnded=includeEnded)
return {"sessions": sessions}
@router.get("/{instanceId}/sessions/{sessionId}")
@limiter.limit("30/minute")
async def getSession(
request: Request,
instanceId: str,
sessionId: str,
includeTranscripts: bool = Query(True),
includeResponses: bool = Query(True),
context: RequestContext = Depends(getRequestContext),
):
"""Get session details with optional transcripts and bot responses."""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
result = {"session": session}
if includeTranscripts:
result["transcripts"] = interface.getTranscripts(sessionId)
if includeResponses:
result["botResponses"] = interface.getBotResponses(sessionId)
result["stats"] = interface.getSessionStats(sessionId)
return result
@router.get("/{instanceId}/sessions/{sessionId}/stream")
@limiter.limit("10/minute")
async def streamSession(
request: Request,
instanceId: str,
sessionId: str,
context: RequestContext = Depends(getRequestContext),
):
"""
SSE live stream for a session.
Streams transcript segments and bot responses in real-time.
Events: transcript, botResponse, statusChange, error
"""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
async def _eventGenerator():
"""Generate SSE events from the session event queue."""
from .service import _sessionEvents
# Send initial session state
yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n"
# Stream events
eventQueue = _sessionEvents.get(sessionId)
if not eventQueue:
_sessionEvents[sessionId] = asyncio.Queue()
eventQueue = _sessionEvents[sessionId]
try:
while True:
try:
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
yield f"data: {json.dumps(event)}\n\n"
# Stop streaming if session ended
if event.get("type") == "statusChange" and event.get("data", {}).get("status") in ["ended", "error"]:
break
except asyncio.TimeoutError:
# Send keepalive ping
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
except asyncio.CancelledError:
pass
return StreamingResponse(
_eventGenerator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@router.post("/{instanceId}/sessions/{sessionId}/stop")
@limiter.limit("10/minute")
async def stopSession(
request: Request,
instanceId: str,
sessionId: str,
context: RequestContext = Depends(getRequestContext),
):
"""Stop an active session -- bot leaves the meeting."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
config = _getInstanceConfig(instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
currentStatus = session.get("status")
if currentStatus in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]:
raise HTTPException(status_code=400, detail=f"Session already in terminal state: {currentStatus}")
# Stop the bot
service = TeamsbotService(context.user, mandateId, instanceId, config)
await service.leaveMeeting(sessionId)
logger.info(f"Teamsbot session {sessionId} stop requested")
return {"status": "stopping", "sessionId": sessionId}
@router.delete("/{instanceId}/sessions/{sessionId}")
@limiter.limit("10/minute")
async def deleteSession(
request: Request,
instanceId: str,
sessionId: str,
context: RequestContext = Depends(getRequestContext),
):
"""Delete a session and all related data."""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
# Don't delete active sessions
currentStatus = session.get("status")
if currentStatus in [TeamsbotSessionStatus.ACTIVE.value, TeamsbotSessionStatus.JOINING.value]:
raise HTTPException(status_code=400, detail="Cannot delete an active session. Stop it first.")
interface.deleteSession(sessionId)
logger.info(f"Teamsbot session {sessionId} deleted")
return {"deleted": True, "sessionId": sessionId}
# =========================================================================
# Configuration Endpoints
# =========================================================================
@router.get("/{instanceId}/config")
@limiter.limit("30/minute")
async def getConfig(
request: Request,
instanceId: str,
context: RequestContext = Depends(getRequestContext),
):
"""Get the teamsbot configuration for a feature instance."""
_validateInstanceAccess(instanceId, context)
config = _getInstanceConfig(instanceId)
return {"config": config.model_dump()}
@router.put("/{instanceId}/config")
@limiter.limit("10/minute")
async def updateConfig(
request: Request,
instanceId: str,
configUpdate: TeamsbotConfigUpdateRequest,
context: RequestContext = Depends(getRequestContext),
):
"""Update the teamsbot configuration."""
mandateId = _validateInstanceAccess(instanceId, context)
# Load current config and merge updates
currentConfig = _getInstanceConfig(instanceId)
updateDict = configUpdate.model_dump(exclude_none=True)
mergedConfig = currentConfig.model_copy(update=updateDict)
# Save to FeatureInstance.config
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
featureInterface.updateFeatureInstanceConfig(instanceId, mergedConfig.model_dump())
logger.info(f"Teamsbot config updated for instance {instanceId}: {list(updateDict.keys())}")
return {"config": mergedConfig.model_dump()}
# =========================================================================
# Bridge Communication Endpoints (called by .NET Media Bridge)
# =========================================================================
@router.post("/{instanceId}/bridge/status")
async def bridgeStatusCallback(
instanceId: str,
statusData: dict,
):
"""
Callback endpoint for the .NET Media Bridge to report status updates.
Called when: bot joins/leaves meeting, errors occur, etc.
Note: No user auth -- authenticated via bridge API key.
"""
sessionId = statusData.get("sessionId")
status = statusData.get("status")
errorMessage = statusData.get("errorMessage")
if not sessionId or not status:
raise HTTPException(status_code=400, detail="Missing sessionId or status")
# TODO: Validate bridge API key from headers
logger.info(f"Bridge status callback: session={sessionId}, status={status}")
# Update session status
from modules.datamodels.datamodelUam import User
systemUser = User(id="system", email="system@internal")
interface = interfaceDb.getInterface(systemUser, featureInstanceId=instanceId)
updates = {"status": status}
if errorMessage:
updates["errorMessage"] = errorMessage
if status == TeamsbotSessionStatus.ACTIVE.value:
from modules.shared.timeUtils import getUtcTimestamp
updates["startedAt"] = getUtcTimestamp()
elif status in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]:
from modules.shared.timeUtils import getUtcTimestamp
updates["endedAt"] = getUtcTimestamp()
interface.updateSession(sessionId, updates)
# Emit SSE event
from .service import _emitSessionEvent
await _emitSessionEvent(sessionId, "statusChange", {"status": status, "errorMessage": errorMessage})
return {"received": True}
@router.websocket("/{instanceId}/bridge/audio/{sessionId}")
async def bridgeAudioWebsocket(
websocket: WebSocket,
instanceId: str,
sessionId: str,
):
"""
Bidirectional WebSocket for audio streaming with the .NET Media Bridge.
Bridge sends: PCM audio frames (LINEAR16, 16kHz, mono)
Gateway sends: TTS audio responses
"""
await websocket.accept()
logger.info(f"Bridge audio WebSocket connected: session={sessionId}")
# TODO: Validate bridge API key from headers/query params
config = _getInstanceConfig(instanceId)
from modules.datamodels.datamodelUam import User
systemUser = User(id="system", email="system@internal")
service = TeamsbotService(systemUser, None, instanceId, config)
try:
await service.handleAudioStream(websocket, sessionId)
except WebSocketDisconnect:
logger.info(f"Bridge audio WebSocket disconnected: session={sessionId}")
except Exception as e:
logger.error(f"Bridge audio WebSocket error: session={sessionId}, error={e}")
finally:
logger.info(f"Bridge audio WebSocket closed: session={sessionId}")