425 lines
15 KiB
Python
425 lines
15 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Teamsbot routes for the backend API.
|
|
Implements Teams Bot session management, live streaming, and configuration endpoints.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import asyncio
|
|
from typing import Optional
|
|
from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, Request
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
# Import auth modules
|
|
from modules.auth import limiter, getRequestContext, RequestContext
|
|
|
|
# Import interfaces
|
|
from . import interfaceFeatureTeamsbot as interfaceDb
|
|
from modules.interfaces.interfaceDbApp import getRootInterface
|
|
from modules.interfaces.interfaceFeatures import getFeatureInterface
|
|
|
|
# Import models
|
|
from .datamodelTeamsbot import (
|
|
TeamsbotSession,
|
|
TeamsbotSessionStatus,
|
|
TeamsbotStartSessionRequest,
|
|
TeamsbotSessionResponse,
|
|
TeamsbotConfigUpdateRequest,
|
|
TeamsbotConfig,
|
|
)
|
|
|
|
# Import service
|
|
from .service import TeamsbotService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router
|
|
router = APIRouter(
|
|
prefix="/api/teamsbot",
|
|
tags=["Teamsbot"],
|
|
responses={404: {"description": "Not found"}}
|
|
)
|
|
|
|
|
|
# =========================================================================
|
|
# Helpers
|
|
# =========================================================================
|
|
|
|
def _getInterface(context: RequestContext, instanceId: Optional[str] = None):
|
|
"""Get teamsbot interface with instance context."""
|
|
mandateId = str(context.mandateId) if context.mandateId else None
|
|
return interfaceDb.getInterface(
|
|
context.user,
|
|
mandateId=mandateId,
|
|
featureInstanceId=instanceId
|
|
)
|
|
|
|
|
|
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
|
|
"""Validate user access to the feature instance. Returns mandateId."""
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
|
|
instance = featureInterface.getFeatureInstance(instanceId)
|
|
if not instance:
|
|
raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found")
|
|
|
|
mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None)
|
|
if not mandateId:
|
|
raise HTTPException(status_code=500, detail="Feature instance has no mandateId")
|
|
|
|
return str(mandateId)
|
|
|
|
|
|
def _getInstanceConfig(instanceId: str) -> TeamsbotConfig:
|
|
"""Load TeamsbotConfig from FeatureInstance.config JSONB field."""
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
instance = featureInterface.getFeatureInstance(instanceId)
|
|
|
|
if not instance:
|
|
return TeamsbotConfig()
|
|
|
|
configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None)
|
|
if configDict and isinstance(configDict, dict):
|
|
try:
|
|
return TeamsbotConfig(**configDict)
|
|
except Exception:
|
|
pass
|
|
return TeamsbotConfig()
|
|
|
|
|
|
# =========================================================================
|
|
# Session Endpoints
|
|
# =========================================================================
|
|
|
|
@router.post("/{instanceId}/sessions")
|
|
@limiter.limit("10/minute")
|
|
async def startSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
body: TeamsbotStartSessionRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Start a new Teams Bot session -- bot joins the specified meeting."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
config = _getInstanceConfig(instanceId)
|
|
|
|
# Create session
|
|
sessionData = TeamsbotSession(
|
|
instanceId=instanceId,
|
|
mandateId=mandateId,
|
|
meetingLink=body.meetingLink,
|
|
botName=body.botName or config.botName,
|
|
backgroundImageUrl=body.backgroundImageUrl or config.backgroundImageUrl,
|
|
status=TeamsbotSessionStatus.PENDING,
|
|
startedByUserId=str(context.user.id),
|
|
).model_dump()
|
|
|
|
createdSession = interface.createSession(sessionData)
|
|
sessionId = createdSession.get("id")
|
|
|
|
# Derive gateway base URL from the incoming request so the bridge
|
|
# can build full callback/WS URLs targeting this specific gateway instance.
|
|
gatewayBaseUrl = str(request.base_url).rstrip("/")
|
|
|
|
# Start the bot in background (join meeting via bridge)
|
|
service = TeamsbotService(context.user, mandateId, instanceId, config)
|
|
asyncio.create_task(
|
|
service.joinMeeting(sessionId, body.meetingLink, body.connectionId, gatewayBaseUrl)
|
|
)
|
|
|
|
logger.info(f"Teamsbot session {sessionId} created for instance {instanceId}")
|
|
return {"session": createdSession}
|
|
|
|
|
|
@router.get("/{instanceId}/sessions")
|
|
@limiter.limit("30/minute")
|
|
async def listSessions(
|
|
request: Request,
|
|
instanceId: str,
|
|
includeEnded: bool = Query(True, description="Include ended sessions"),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""List all sessions for a feature instance."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
sessions = interface.getSessions(instanceId, includeEnded=includeEnded)
|
|
return {"sessions": sessions}
|
|
|
|
|
|
@router.get("/{instanceId}/sessions/{sessionId}")
|
|
@limiter.limit("30/minute")
|
|
async def getSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
includeTranscripts: bool = Query(True),
|
|
includeResponses: bool = Query(True),
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Get session details with optional transcripts and bot responses."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
|
|
|
|
result = {"session": session}
|
|
|
|
if includeTranscripts:
|
|
result["transcripts"] = interface.getTranscripts(sessionId)
|
|
|
|
if includeResponses:
|
|
result["botResponses"] = interface.getBotResponses(sessionId)
|
|
result["stats"] = interface.getSessionStats(sessionId)
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/{instanceId}/sessions/{sessionId}/stream")
|
|
@limiter.limit("10/minute")
|
|
async def streamSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""
|
|
SSE live stream for a session.
|
|
Streams transcript segments and bot responses in real-time.
|
|
Events: transcript, botResponse, statusChange, error
|
|
"""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
|
|
|
|
async def _eventGenerator():
|
|
"""Generate SSE events from the session event queue."""
|
|
from .service import _sessionEvents
|
|
|
|
# Send initial session state
|
|
yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n"
|
|
|
|
# Stream events
|
|
eventQueue = _sessionEvents.get(sessionId)
|
|
if not eventQueue:
|
|
_sessionEvents[sessionId] = asyncio.Queue()
|
|
eventQueue = _sessionEvents[sessionId]
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
|
|
yield f"data: {json.dumps(event)}\n\n"
|
|
|
|
# Stop streaming if session ended
|
|
if event.get("type") == "statusChange" and event.get("data", {}).get("status") in ["ended", "error"]:
|
|
break
|
|
except asyncio.TimeoutError:
|
|
# Send keepalive ping
|
|
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
return StreamingResponse(
|
|
_eventGenerator(),
|
|
media_type="text/event-stream",
|
|
headers={
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"X-Accel-Buffering": "no",
|
|
}
|
|
)
|
|
|
|
|
|
@router.post("/{instanceId}/sessions/{sessionId}/stop")
|
|
@limiter.limit("10/minute")
|
|
async def stopSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Stop an active session -- bot leaves the meeting."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
config = _getInstanceConfig(instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
|
|
|
|
currentStatus = session.get("status")
|
|
if currentStatus in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]:
|
|
raise HTTPException(status_code=400, detail=f"Session already in terminal state: {currentStatus}")
|
|
|
|
# Stop the bot
|
|
service = TeamsbotService(context.user, mandateId, instanceId, config)
|
|
await service.leaveMeeting(sessionId)
|
|
|
|
logger.info(f"Teamsbot session {sessionId} stop requested")
|
|
return {"status": "stopping", "sessionId": sessionId}
|
|
|
|
|
|
@router.delete("/{instanceId}/sessions/{sessionId}")
|
|
@limiter.limit("10/minute")
|
|
async def deleteSession(
|
|
request: Request,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Delete a session and all related data."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
interface = _getInterface(context, instanceId)
|
|
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
|
|
|
|
# Don't delete active sessions
|
|
currentStatus = session.get("status")
|
|
if currentStatus in [TeamsbotSessionStatus.ACTIVE.value, TeamsbotSessionStatus.JOINING.value]:
|
|
raise HTTPException(status_code=400, detail="Cannot delete an active session. Stop it first.")
|
|
|
|
interface.deleteSession(sessionId)
|
|
logger.info(f"Teamsbot session {sessionId} deleted")
|
|
return {"deleted": True, "sessionId": sessionId}
|
|
|
|
|
|
# =========================================================================
|
|
# Configuration Endpoints
|
|
# =========================================================================
|
|
|
|
@router.get("/{instanceId}/config")
|
|
@limiter.limit("30/minute")
|
|
async def getConfig(
|
|
request: Request,
|
|
instanceId: str,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Get the teamsbot configuration for a feature instance."""
|
|
_validateInstanceAccess(instanceId, context)
|
|
config = _getInstanceConfig(instanceId)
|
|
return {"config": config.model_dump()}
|
|
|
|
|
|
@router.put("/{instanceId}/config")
|
|
@limiter.limit("10/minute")
|
|
async def updateConfig(
|
|
request: Request,
|
|
instanceId: str,
|
|
configUpdate: TeamsbotConfigUpdateRequest,
|
|
context: RequestContext = Depends(getRequestContext),
|
|
):
|
|
"""Update the teamsbot configuration."""
|
|
mandateId = _validateInstanceAccess(instanceId, context)
|
|
|
|
# Load current config and merge updates
|
|
currentConfig = _getInstanceConfig(instanceId)
|
|
updateDict = configUpdate.model_dump(exclude_none=True)
|
|
mergedConfig = currentConfig.model_copy(update=updateDict)
|
|
|
|
# Save to FeatureInstance.config
|
|
rootInterface = getRootInterface()
|
|
featureInterface = getFeatureInterface(rootInterface.db)
|
|
featureInterface.updateFeatureInstanceConfig(instanceId, mergedConfig.model_dump())
|
|
|
|
logger.info(f"Teamsbot config updated for instance {instanceId}: {list(updateDict.keys())}")
|
|
return {"config": mergedConfig.model_dump()}
|
|
|
|
|
|
# =========================================================================
|
|
# Bridge Communication Endpoints (called by .NET Media Bridge)
|
|
# =========================================================================
|
|
|
|
@router.post("/{instanceId}/bridge/status")
|
|
async def bridgeStatusCallback(
|
|
instanceId: str,
|
|
statusData: dict,
|
|
):
|
|
"""
|
|
Callback endpoint for the .NET Media Bridge to report status updates.
|
|
Called when: bot joins/leaves meeting, errors occur, etc.
|
|
Note: No user auth -- authenticated via bridge API key.
|
|
"""
|
|
sessionId = statusData.get("sessionId")
|
|
status = statusData.get("status")
|
|
errorMessage = statusData.get("errorMessage")
|
|
|
|
if not sessionId or not status:
|
|
raise HTTPException(status_code=400, detail="Missing sessionId or status")
|
|
|
|
# TODO: Validate bridge API key from headers
|
|
|
|
logger.info(f"Bridge status callback: session={sessionId}, status={status}")
|
|
|
|
try:
|
|
# Update session status (bridge callbacks have no user context)
|
|
from modules.datamodels.datamodelUam import User
|
|
systemUser = User(id="system", username="system", email="system@internal.local")
|
|
interface = interfaceDb.getInterface(systemUser, featureInstanceId=instanceId)
|
|
|
|
updates = {"status": status}
|
|
if errorMessage:
|
|
updates["errorMessage"] = errorMessage
|
|
if status == TeamsbotSessionStatus.ACTIVE.value:
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
updates["startedAt"] = getUtcTimestamp()
|
|
elif status in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]:
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
updates["endedAt"] = getUtcTimestamp()
|
|
|
|
interface.updateSession(sessionId, updates)
|
|
|
|
# Emit SSE event
|
|
from .service import _emitSessionEvent
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": status, "errorMessage": errorMessage})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Bridge status callback processing failed: session={sessionId}, error={e}")
|
|
# Still return 200 so the bridge doesn't retry endlessly
|
|
|
|
return {"received": True}
|
|
|
|
|
|
@router.websocket("/{instanceId}/bridge/audio/{sessionId}")
|
|
async def bridgeAudioWebsocket(
|
|
websocket: WebSocket,
|
|
instanceId: str,
|
|
sessionId: str,
|
|
):
|
|
"""
|
|
Bidirectional WebSocket for audio streaming with the .NET Media Bridge.
|
|
Bridge sends: PCM audio frames (LINEAR16, 16kHz, mono)
|
|
Gateway sends: TTS audio responses
|
|
"""
|
|
await websocket.accept()
|
|
logger.info(f"Bridge audio WebSocket connected: session={sessionId}, instance={instanceId}")
|
|
|
|
# TODO: Validate bridge API key from headers/query params
|
|
|
|
try:
|
|
config = _getInstanceConfig(instanceId)
|
|
logger.info(f"Bridge audio WebSocket config loaded: session={sessionId}")
|
|
|
|
from modules.datamodels.datamodelUam import User
|
|
systemUser = User(id="system", username="system", email="system@internal.local")
|
|
service = TeamsbotService(systemUser, None, instanceId, config)
|
|
logger.info(f"Bridge audio WebSocket service created: session={sessionId}")
|
|
|
|
await service.handleAudioStream(websocket, sessionId)
|
|
except WebSocketDisconnect:
|
|
logger.info(f"Bridge audio WebSocket disconnected: session={sessionId}")
|
|
except Exception as e:
|
|
logger.error(f"Bridge audio WebSocket error: session={sessionId}, error={e}", exc_info=True)
|
|
finally:
|
|
logger.info(f"Bridge audio WebSocket closed: session={sessionId}")
|