From c7def85a4bd948b8501b1cb2d3575aff458af9ee Mon Sep 17 00:00:00 2001 From: patrick-motsch Date: Sun, 15 Feb 2026 00:04:39 +0100 Subject: [PATCH] feat(teamsbot): Replace bridge with browser bot architecture Co-authored-by: Cursor --- env_dev.env | 6 +- env_int.env | 4 +- env_prod.env | 4 +- .../features/teamsbot/browserBotConnector.py | 149 +++++++++++ .../features/teamsbot/datamodelTeamsbot.py | 14 +- .../features/teamsbot/routeFeatureTeamsbot.py | 96 ++----- modules/features/teamsbot/service.py | 240 +++++++++--------- 7 files changed, 299 insertions(+), 214 deletions(-) create mode 100644 modules/features/teamsbot/browserBotConnector.py diff --git a/env_dev.env b/env_dev.env index a195c856..c34afb69 100644 --- a/env_dev.env +++ b/env_dev.env @@ -57,8 +57,10 @@ Connector_GoogleSpeech_API_KEY_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpETk5FWWM3Q0JKMzhI # Feature SyncDelta JIRA configuration Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpEbm0yRUJ6VUJKbUwyRW5kMnRaNW4wM2YxMkJUTXVXZUdmdVRCaUZIVHU2TTV2RWZLRmUtZkcwZE4yRUNlNDQ0aUJWYjNfdVg5YjV5c2JwMHhoUUYxZWdkeS11bXR0eGxRLWRVaVU3cUVQZWJlNDRtY1lWUDdqeDVFSlpXS0VFX21WajlRS3lHQjc0bS11akkybWV3QUFlR2hNWUNYLUdiRjZuN2dQODdDSExXWG1Dd2ZGclI2aUhlSWhETVZuY3hYdnhkb2c2LU1JTFBvWFpTNmZtMkNVOTZTejJwbDI2eGE0OS1xUlIwQnlCSmFxRFNCeVJNVzlOMDhTR1VUamx4RDRyV3p6Tk9qVHBrWWdySUM3TVRaYjd3N0JHMFhpdzFhZTNDLTFkRVQ2RVE4U19COXRhRWtNc0NVOHRqUS1CRDFpZ19xQmtFLU9YSDU3TXBZQXpVcld3PT0= -# Teamsbot Media Bridge -TEAMSBOT_BRIDGE_URL = https://media.poweron.swiss:9440 +# Teamsbot Browser Bot Service +# For local testing: run the bot locally with `npm run dev` in service-teams-browser-bot +# The bot will connect back to localhost:8000 via WebSocket +TEAMSBOT_BROWSER_BOT_URL = http://localhost:4100 # Debug Configuration APP_DEBUG_CHAT_WORKFLOW_ENABLED = True diff --git a/env_int.env b/env_int.env index 87cb8117..85aa7976 100644 --- a/env_int.env +++ b/env_int.env @@ -57,8 +57,8 @@ Connector_GoogleSpeech_API_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRkNmVXZ1pWcHcydTF2 # Feature SyncDelta JIRA configuration Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = INT_ENC:Z0FBQUFBQm8xSVRkTUNsWm4wX0p6eXFDZmJ4dFdHNEs1MV9MUzdrb3RzeC1jVWVYZ0REWHRyZkFiaGZLcUQtTXFBZzZkNzRmQ0gxbEhGbUNlVVFfR1JEQTc0aldkZkgyWnBOcjdlUlZxR0tDTEdKRExULXAyUEtsVmNTMkRKU1BJNnFiM0hlMXo4YndMcHlRMExtZDQ3Zm9vNFhMcEZCcHpBPT0= -# Teamsbot Media Bridge -TEAMSBOT_BRIDGE_URL = https://media.poweron.swiss:9440 +# Teamsbot Browser Bot Service +TEAMSBOT_BROWSER_BOT_URL = https://cae-poweron-shared.redwater-53d21339.switzerlandnorth.azurecontainerapps.io # Debug Configuration APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE diff --git a/env_prod.env b/env_prod.env index 8c85fbb6..956923f1 100644 --- a/env_prod.env +++ b/env_prod.env @@ -57,8 +57,8 @@ Connector_GoogleSpeech_API_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z4NFQxaF9uN3h1cVB # Feature SyncDelta JIRA configuration Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z4d3Z4d2x6N1FhUktMU0RKbkxfY2pTQkRzXzJ6UXVEbDNCaFM3UHMtQVFGYzNmYWs4N0lMM1R2SFJuZTVFVmx6MGVEbXc5U3NOTnY1TWN0ZDNaamlHQWloalM3VldmREJNSHQ1TlVkSVFJMTVhQWVGSVRMTGw4UTBqNGlQZFVuaHp4WUlKemR5UnBXZlh0REJFLXJ4ejR3PT0= -# Teamsbot Media Bridge -TEAMSBOT_BRIDGE_URL = https://media.poweron.swiss:9440 +# Teamsbot Browser Bot Service +TEAMSBOT_BROWSER_BOT_URL = https://cae-poweron-shared.redwater-53d21339.switzerlandnorth.azurecontainerapps.io # Debug Configuration APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE diff --git a/modules/features/teamsbot/browserBotConnector.py b/modules/features/teamsbot/browserBotConnector.py new file mode 100644 index 00000000..b0fa5310 --- /dev/null +++ b/modules/features/teamsbot/browserBotConnector.py @@ -0,0 +1,149 @@ +# Copyright (c) 2025 Patrick Motsch +# All rights reserved. +""" +Browser Bot Connector - Communication with the Node.js Browser Bot Service. +Handles HTTP and WebSocket communication for meeting join/leave and transcript/audio streaming. + +This replaces the old .NET Media Bridge connector with a simpler browser-based approach. +""" + +import logging +import aiohttp +import asyncio +from typing import Optional, Dict, Any, Callable, Awaitable + +logger = logging.getLogger(__name__) + +# Default timeout for bot HTTP calls +_BOT_TIMEOUT = aiohttp.ClientTimeout(total=30) + + +class BrowserBotConnector: + """Connector to the Node.js Browser Bot service.""" + + def __init__(self, botUrl: Optional[str] = None): + self.botUrl = (botUrl or "").rstrip("/") + + def _isConfigured(self) -> bool: + """Check if the bot URL is configured.""" + return bool(self.botUrl) + + async def joinMeeting( + self, + sessionId: str, + meetingUrl: str, + botName: str, + instanceId: str, + gatewayWsUrl: str, + ) -> Dict[str, Any]: + """ + Send join command to the Browser Bot service. + + The bot will: + 1. Launch a headless browser + 2. Navigate to Teams web app + 3. Join the meeting + 4. Enable captions and start scraping + 5. Connect back to Gateway via WebSocket using gatewayWsUrl + + Args: + gatewayWsUrl: Full WebSocket URL for the bot to connect back to + (e.g. wss://gateway-int.poweron-center.net/api/teamsbot/{instanceId}/bot/ws/{sessionId}) + + Returns: + Dict with 'success' bool and optional 'error' string. + """ + if not self._isConfigured(): + logger.warning("Browser Bot URL not configured. Simulating join for development.") + return { + "success": True, + "message": "Development mode: Browser Bot not connected" + } + + payload = { + "sessionId": sessionId, + "meetingUrl": meetingUrl, + "botName": botName, + "instanceId": instanceId, + "gatewayWsUrl": gatewayWsUrl, # Full WebSocket URL for bot to connect back + } + + try: + async with aiohttp.ClientSession(timeout=_BOT_TIMEOUT) as session: + async with session.post(f"{self.botUrl}/api/bot", json=payload) as resp: + if resp.status == 200: + data = await resp.json() + return { + "success": data.get("success", True), + } + else: + errorText = await resp.text() + logger.error(f"Browser Bot join failed: {resp.status} - {errorText}") + return {"success": False, "error": f"Bot returned {resp.status}: {errorText}"} + + except aiohttp.ClientError as e: + logger.error(f"Browser Bot connection error: {e}") + return {"success": False, "error": f"Bot connection failed: {str(e)}"} + except Exception as e: + logger.error(f"Browser Bot join error: {e}") + return {"success": False, "error": str(e)} + + async def leaveMeeting(self, sessionId: str) -> Dict[str, Any]: + """Send leave command to the Browser Bot service.""" + if not self._isConfigured(): + logger.warning("Browser Bot URL not configured. Simulating leave for development.") + return {"success": True, "message": "Development mode: Browser Bot not connected"} + + try: + async with aiohttp.ClientSession(timeout=_BOT_TIMEOUT) as session: + async with session.post(f"{self.botUrl}/api/bot/{sessionId}/leave") as resp: + if resp.status == 200: + return {"success": True} + else: + errorText = await resp.text() + logger.error(f"Browser Bot leave failed: {resp.status} - {errorText}") + return {"success": False, "error": f"Bot returned {resp.status}: {errorText}"} + + except Exception as e: + logger.error(f"Browser Bot leave error: {e}") + return {"success": False, "error": str(e)} + + async def getStatus(self, sessionId: Optional[str] = None) -> Dict[str, Any]: + """Get bot health and optionally session status.""" + if not self._isConfigured(): + return {"healthy": False, "message": "Browser Bot URL not configured"} + + try: + async with aiohttp.ClientSession(timeout=_BOT_TIMEOUT) as session: + if sessionId: + url = f"{self.botUrl}/api/bot/{sessionId}/status" + else: + url = f"{self.botUrl}/health" + + async with session.get(url) as resp: + if resp.status == 200: + data = await resp.json() + return {"healthy": True, **data} + else: + return {"healthy": False, "error": f"Bot returned {resp.status}"} + + except Exception as e: + return {"healthy": False, "error": str(e)} + + async def sendAudio( + self, + sessionId: str, + audioData: bytes, + audioFormat: str = "mp3", + ) -> bool: + """ + Send TTS audio to the bot for playback in the meeting. + This is called via the WebSocket connection, not HTTP. + + Note: This method is here for reference but actual audio is sent + via the WebSocket in the route handler. + """ + # Audio is sent via WebSocket, not HTTP + # This method is a placeholder for documentation + logger.debug(f"sendAudio called for session {sessionId} - should use WebSocket") + return True diff --git a/modules/features/teamsbot/datamodelTeamsbot.py b/modules/features/teamsbot/datamodelTeamsbot.py index 8e3dc60c..d85e6e2f 100644 --- a/modules/features/teamsbot/datamodelTeamsbot.py +++ b/modules/features/teamsbot/datamodelTeamsbot.py @@ -116,17 +116,17 @@ class TeamsbotConfig(BaseModel): responseMode: TeamsbotResponseMode = Field(default=TeamsbotResponseMode.AUTO, description="How the bot responds") language: str = Field(default="de-DE", description="Primary language for STT/TTS") voiceId: Optional[str] = Field(default=None, description="Google TTS voice ID (e.g., de-DE-Standard-A)") - bridgeUrl: Optional[str] = Field(default=None, description="URL of the .NET Media Bridge service. Falls back to TEAMSBOT_BRIDGE_URL env variable if not set per-instance.") + browserBotUrl: Optional[str] = Field(default=None, description="URL of the Browser Bot service. Falls back to TEAMSBOT_BROWSER_BOT_URL env variable if not set per-instance.") triggerIntervalSeconds: int = Field(default=10, ge=3, le=60, description="Seconds between periodic AI analysis triggers") triggerCooldownSeconds: int = Field(default=3, ge=1, le=30, description="Minimum seconds between AI calls") contextWindowSegments: int = Field(default=20, ge=5, le=100, description="Number of transcript segments to include in AI context") - def _getEffectiveBridgeUrl(self) -> Optional[str]: - """Resolve the effective bridge URL: per-instance config takes priority, then env variable.""" - if self.bridgeUrl: - return self.bridgeUrl + def _getEffectiveBrowserBotUrl(self) -> Optional[str]: + """Resolve the effective browser bot URL: per-instance config takes priority, then env variable.""" + if self.browserBotUrl: + return self.browserBotUrl from modules.shared.configuration import APP_CONFIG - return APP_CONFIG.get("TEAMSBOT_BRIDGE_URL") + return APP_CONFIG.get("TEAMSBOT_BROWSER_BOT_URL") # ============================================================================ @@ -156,7 +156,7 @@ class TeamsbotConfigUpdateRequest(BaseModel): responseMode: Optional[TeamsbotResponseMode] = None language: Optional[str] = None voiceId: Optional[str] = None - bridgeUrl: Optional[str] = None + browserBotUrl: Optional[str] = None triggerIntervalSeconds: Optional[int] = None triggerCooldownSeconds: Optional[int] = None contextWindowSegments: Optional[int] = None diff --git a/modules/features/teamsbot/routeFeatureTeamsbot.py b/modules/features/teamsbot/routeFeatureTeamsbot.py index 9821fc6c..997b9a58 100644 --- a/modules/features/teamsbot/routeFeatureTeamsbot.py +++ b/modules/features/teamsbot/routeFeatureTeamsbot.py @@ -338,92 +338,36 @@ async def updateConfig( # ========================================================================= -# Bridge Communication Endpoints (called by .NET Media Bridge) +# Browser Bot Communication Endpoints # ========================================================================= -@router.post("/{instanceId}/bridge/status") -async def bridgeStatusCallback( - instanceId: str, - statusData: dict, -): - """ - Callback endpoint for the .NET Media Bridge to report status updates. - Called when: bot joins/leaves meeting, errors occur, etc. - Note: No user auth -- authenticated via bridge API key. - """ - sessionId = statusData.get("sessionId") - status = statusData.get("status") - errorMessage = statusData.get("errorMessage") - - if not sessionId or not status: - raise HTTPException(status_code=400, detail="Missing sessionId or status") - - # TODO: Validate bridge API key from headers - - logger.info(f"Bridge status callback: session={sessionId}, status={status}") - - try: - # Load the original user who started the session (has RBAC roles in mandate) - from modules.datamodels.datamodelUam import User - from modules.interfaces.interfaceDbApp import getRootInterface - - systemUser = User(id="system", username="system", email="system@poweron.swiss") - interface = interfaceDb.getInterface(systemUser, featureInstanceId=instanceId) - - # Look up original user from session for consistent context - session = interface.getSession(sessionId) - startedByUserId = session.get("startedByUserId") if session else None - if startedByUserId: - rootInterface = getRootInterface() - originalUser = rootInterface.getUser(startedByUserId) - if originalUser: - interface = interfaceDb.getInterface(originalUser, featureInstanceId=instanceId) - - updates = {"status": status} - if errorMessage: - updates["errorMessage"] = errorMessage - if status == TeamsbotSessionStatus.ACTIVE.value: - from modules.shared.timeUtils import getUtcTimestamp - updates["startedAt"] = getUtcTimestamp() - elif status in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]: - from modules.shared.timeUtils import getUtcTimestamp - updates["endedAt"] = getUtcTimestamp() - - interface.updateSession(sessionId, updates) - - # Emit SSE event - from .service import _emitSessionEvent - await _emitSessionEvent(sessionId, "statusChange", {"status": status, "errorMessage": errorMessage}) - - except Exception as e: - logger.error(f"Bridge status callback processing failed: session={sessionId}, error={e}") - # Still return 200 so the bridge doesn't retry endlessly - - return {"received": True} - - -@router.websocket("/{instanceId}/bridge/audio/{sessionId}") -async def bridgeAudioWebsocket( +@router.websocket("/{instanceId}/bot/ws/{sessionId}") +async def botWebsocket( websocket: WebSocket, instanceId: str, sessionId: str, ): """ - Bidirectional WebSocket for audio streaming with the .NET Media Bridge. - Bridge sends: PCM audio frames (LINEAR16, 16kHz, mono) - Gateway sends: TTS audio responses + Bidirectional WebSocket for communication with the Browser Bot. + + Bot sends: + - transcript: Caption text scraped from Teams meeting + - status: Bot state changes (joined, in_lobby, left, error) + + Gateway sends: + - playAudio: TTS audio for the bot to play in the meeting """ await websocket.accept() - logger.info(f"Bridge audio WebSocket connected: session={sessionId}, instance={instanceId}") + logger.info(f"Browser Bot WebSocket connected: session={sessionId}, instance={instanceId}") - # TODO: Validate bridge API key from headers/query params + # TODO: Validate bot API key from headers/query params try: config = _getInstanceConfig(instanceId) - logger.info(f"Bridge audio WebSocket config loaded: session={sessionId}") + logger.info(f"Browser Bot WebSocket config loaded: session={sessionId}") # Load the original user who started the session (has RBAC roles in mandate) - # Bridge callbacks have no HTTP auth, so we reconstruct the user context from the session record. + # Bot callbacks have no HTTP auth, so we reconstruct the user context from the session record. from modules.datamodels.datamodelUam import User from modules.interfaces.interfaceDbApp import getRootInterface @@ -442,12 +386,12 @@ async def bridgeAudioWebsocket( originalUser = systemUser service = TeamsbotService(originalUser, mandateId, instanceId, config) - logger.info(f"Bridge audio WebSocket service created: session={sessionId}, mandateId={mandateId}, user={originalUser.id}") + logger.info(f"Browser Bot WebSocket service created: session={sessionId}, mandateId={mandateId}, user={originalUser.id}") - await service.handleAudioStream(websocket, sessionId) + await service.handleBotWebSocket(websocket, sessionId) except WebSocketDisconnect: - logger.info(f"Bridge audio WebSocket disconnected: session={sessionId}") + logger.info(f"Browser Bot WebSocket disconnected: session={sessionId}") except Exception as e: - logger.error(f"Bridge audio WebSocket error: session={sessionId}, error={e}", exc_info=True) + logger.error(f"Browser Bot WebSocket error: session={sessionId}, error={e}", exc_info=True) finally: - logger.info(f"Bridge audio WebSocket closed: session={sessionId}") + logger.info(f"Browser Bot WebSocket closed: session={sessionId}") diff --git a/modules/features/teamsbot/service.py b/modules/features/teamsbot/service.py index ea10fddd..50a938e0 100644 --- a/modules/features/teamsbot/service.py +++ b/modules/features/teamsbot/service.py @@ -27,7 +27,7 @@ from .datamodelTeamsbot import ( TeamsbotResponseMode, SpeechTeamsResponse, ) -from .bridgeConnector import BridgeConnector +from .browserBotConnector import BrowserBotConnector logger = logging.getLogger(__name__) @@ -71,7 +71,7 @@ class TeamsbotService: self.mandateId = mandateId self.instanceId = instanceId self.config = config - self.bridgeConnector = BridgeConnector(config._getEffectiveBridgeUrl()) + self.browserBotConnector = BrowserBotConnector(config._getEffectiveBrowserBotUrl()) # State self._lastAiCallTime: float = 0.0 @@ -88,11 +88,14 @@ class TeamsbotService: connectionId: Optional[str] = None, gatewayBaseUrl: str = "", ): - """Send join command to the .NET Media Bridge. + """Send join command to the Browser Bot service. - Args: - gatewayBaseUrl: Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net). - Passed to the bridge so it can build full callback/WS URLs per-session. + The browser bot will: + 1. Launch a headless browser + 2. Navigate to Teams web app + 3. Join the meeting as anonymous guest + 4. Enable captions and start scraping + 5. Connect back via WebSocket to send transcripts """ from . import interfaceFeatureTeamsbot as interfaceDb @@ -106,30 +109,30 @@ class TeamsbotService: interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.JOINING.value}) await _emitSessionEvent(sessionId, "statusChange", {"status": "joining"}) - # Send join command to bridge + # Send join command to browser bot session = interface.getSession(sessionId) if not session: raise ValueError(f"Session {sessionId} not found") - result = await self.bridgeConnector.joinMeeting( - meetingLink=meetingLink, - botName=session.get("botName", self.config.botName), - backgroundImageUrl=session.get("backgroundImageUrl"), + # Build the full WebSocket URL for the bot to connect back to this gateway instance + # gatewayBaseUrl is passed from the route handler (derived from request.base_url) + wsScheme = "wss" if gatewayBaseUrl.startswith("https") else "ws" + gatewayHost = gatewayBaseUrl.replace("https://", "").replace("http://", "").rstrip("/") + fullGatewayWsUrl = f"{wsScheme}://{gatewayHost}/api/teamsbot/{self.instanceId}/bot/ws/{sessionId}" + + result = await self.browserBotConnector.joinMeeting( sessionId=sessionId, - gatewayCallbackUrl=f"/api/teamsbot/{self.instanceId}/bridge/status", - gatewayWsUrl=f"/api/teamsbot/{self.instanceId}/bridge/audio/{sessionId}", - gatewayBaseUrl=gatewayBaseUrl, + meetingUrl=meetingLink, + botName=session.get("botName", self.config.botName), + instanceId=self.instanceId, + gatewayWsUrl=fullGatewayWsUrl, ) if result.get("success"): - bridgeSessionId = result.get("bridgeSessionId") interface.updateSession(sessionId, { - "bridgeSessionId": bridgeSessionId, - "status": TeamsbotSessionStatus.ACTIVE.value, - "startedAt": getUtcTimestamp(), + "status": TeamsbotSessionStatus.JOINING.value, # Will become ACTIVE when bot connects via WS }) - await _emitSessionEvent(sessionId, "statusChange", {"status": "active"}) - logger.info(f"Bot joined meeting for session {sessionId}") + logger.info(f"Browser bot deployment started for session {sessionId}") else: errorMsg = result.get("error", "Unknown error joining meeting") interface.updateSession(sessionId, { @@ -137,7 +140,7 @@ class TeamsbotService: "errorMessage": errorMsg, }) await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": errorMsg}) - logger.error(f"Failed to join meeting for session {sessionId}: {errorMsg}") + logger.error(f"Failed to deploy browser bot for session {sessionId}: {errorMsg}") except Exception as e: logger.error(f"Error joining meeting for session {sessionId}: {e}") @@ -148,7 +151,7 @@ class TeamsbotService: await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": str(e)}) async def leaveMeeting(self, sessionId: str): - """Send leave command to the .NET Media Bridge.""" + """Send leave command to the Browser Bot service.""" from . import interfaceFeatureTeamsbot as interfaceDb interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) @@ -157,7 +160,7 @@ class TeamsbotService: interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.LEAVING.value}) await _emitSessionEvent(sessionId, "statusChange", {"status": "leaving"}) - await self.bridgeConnector.leaveMeeting(sessionId) + await self.browserBotConnector.leaveMeeting(sessionId) interface.updateSession(sessionId, { "status": TeamsbotSessionStatus.ENDED.value, @@ -182,13 +185,19 @@ class TeamsbotService: _sessionEvents.pop(sessionId, None) # ========================================================================= - # Audio Processing Pipeline + # Browser Bot WebSocket Communication # ========================================================================= - async def handleAudioStream(self, websocket: WebSocket, sessionId: str): + async def handleBotWebSocket(self, websocket: WebSocket, sessionId: str): """ - Main audio processing loop. - Receives PCM audio from bridge via WebSocket, processes through STT -> AI -> TTS pipeline. + Main WebSocket handler for Browser Bot communication. + + Receives: + - transcript: Caption text scraped from Teams + - status: Bot state changes (joined, in_lobby, left, error) + + Sends: + - playAudio: TTS audio for the bot to play in the meeting """ from . import interfaceFeatureTeamsbot as interfaceDb from modules.interfaces.interfaceVoiceObjects import getVoiceInterface @@ -196,128 +205,106 @@ class TeamsbotService: interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) voiceInterface = getVoiceInterface(self.currentUser, self.mandateId) - audioBuffer = bytearray() - bufferDurationMs = 0 - targetBufferMs = 3000 # Buffer 3 seconds of audio before STT - - # PCM16 at 16kHz mono = 32000 bytes/second - bytesPerSecond = 32000 - bytesPerMs = bytesPerSecond / 1000 - - # Track background STT/AI tasks so they don't block the WebSocket loop - backgroundTasks: list[asyncio.Task] = [] - - logger.info(f"Audio processing started for session {sessionId}") + logger.info(f"Browser Bot WebSocket connected for session {sessionId}") try: while True: - # Receive audio data from bridge data = await websocket.receive() - if "bytes" in data: - audioChunk = data["bytes"] - elif "text" in data: - # JSON message (control messages) - message = json.loads(data["text"]) - msgType = message.get("type") - - if msgType == "audio_chunk": - audioChunk = base64.b64decode(message.get("data", "")) - elif msgType == "session_ended": - logger.info(f"Bridge signaled session ended: {sessionId}") - break - elif msgType == "ping": - await websocket.send_text(json.dumps({"type": "pong"})) - continue - else: - continue - else: + if "text" not in data: continue - # Accumulate audio in buffer - audioBuffer.extend(audioChunk) - bufferDurationMs = len(audioBuffer) / bytesPerMs + message = json.loads(data["text"]) + msgType = message.get("type") - # Process when buffer has enough audio - run in background to not block WebSocket - if bufferDurationMs >= targetBufferMs: - chunkBytes = bytes(audioBuffer) - audioBuffer.clear() - bufferDurationMs = 0 - - task = asyncio.create_task( - self._processAudioBuffer( - chunkBytes, - sessionId, - interface, - voiceInterface, - websocket, - ) + if msgType == "transcript": + # Process transcript from captions + transcript = message.get("transcript", {}) + await self._processTranscript( + sessionId=sessionId, + speaker=transcript.get("speaker", "Unknown"), + text=transcript.get("text", ""), + isFinal=transcript.get("isFinal", True), + interface=interface, + voiceInterface=voiceInterface, + websocket=websocket, ) - backgroundTasks.append(task) - # Clean up completed tasks - backgroundTasks = [t for t in backgroundTasks if not t.done()] + elif msgType == "status": + # Handle status updates from bot + status = message.get("status") + errorMessage = message.get("message") + await self._handleBotStatus(sessionId, status, errorMessage, interface) + + elif msgType == "ping": + await websocket.send_text(json.dumps({"type": "pong"})) except Exception as e: if "disconnect" not in str(e).lower(): - logger.error(f"Audio stream error for session {sessionId}: {e}") + logger.error(f"Browser Bot WebSocket error for session {sessionId}: {e}") - # Process remaining buffer - if len(audioBuffer) > 0: - await self._processAudioBuffer( - bytes(audioBuffer), - sessionId, - interface, - voiceInterface, - websocket, - ) + logger.info(f"Browser Bot WebSocket disconnected for session {sessionId}") - # Wait for any remaining background tasks - if backgroundTasks: - await asyncio.gather(*backgroundTasks, return_exceptions=True) - - logger.info(f"Audio processing ended for session {sessionId}") - - async def _processAudioBuffer( + async def _handleBotStatus( self, - audioBytes: bytes, sessionId: str, + status: str, + errorMessage: Optional[str], + interface, + ): + """Handle status updates from the browser bot.""" + logger.info(f"Bot status update for session {sessionId}: {status}") + + statusMap = { + "connecting": TeamsbotSessionStatus.JOINING.value, + "in_lobby": TeamsbotSessionStatus.JOINING.value, # Still joining, waiting in lobby + "joined": TeamsbotSessionStatus.ACTIVE.value, + "left": TeamsbotSessionStatus.ENDED.value, + "error": TeamsbotSessionStatus.ERROR.value, + } + + dbStatus = statusMap.get(status, TeamsbotSessionStatus.ACTIVE.value) + + updates = {"status": dbStatus} + if errorMessage: + updates["errorMessage"] = errorMessage + if dbStatus == TeamsbotSessionStatus.ACTIVE.value: + updates["startedAt"] = getUtcTimestamp() + elif dbStatus in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]: + updates["endedAt"] = getUtcTimestamp() + + interface.updateSession(sessionId, updates) + await _emitSessionEvent(sessionId, "statusChange", {"status": status, "errorMessage": errorMessage}) + + # Generate summary when session ends + if dbStatus == TeamsbotSessionStatus.ENDED.value: + asyncio.create_task(self._generateMeetingSummary(sessionId)) + + async def _processTranscript( + self, + sessionId: str, + speaker: str, + text: str, + isFinal: bool, interface, voiceInterface, websocket: WebSocket, ): - """Process a buffered audio chunk through the STT -> AI -> TTS pipeline.""" + """Process a transcript segment from the browser bot's caption scraping.""" - # Step 1: STT -- convert audio to text - # skipFallbacks=True because we know the exact format (LINEAR16, 16kHz, mono from Teams) - try: - sttResult = await voiceInterface.speechToText( - audioContent=audioBytes, - language=self.config.language, - sampleRate=16000, - channels=1, - skipFallbacks=True - ) - except Exception as e: - logger.warning(f"STT failed for session {sessionId}: {e}") + text = text.strip() + if not text: return - transcriptText = sttResult.get("text", "").strip() if isinstance(sttResult, dict) else str(sttResult).strip() - if not transcriptText: - return - - confidence = sttResult.get("confidence", 0.0) if isinstance(sttResult, dict) else 0.0 - speaker = sttResult.get("speaker") if isinstance(sttResult, dict) else None - # Store transcript segment transcriptData = TeamsbotTranscript( sessionId=sessionId, speaker=speaker, - text=transcriptText, + text=text, timestamp=str(getUtcTimestamp()), - confidence=confidence, + confidence=1.0, # Captions don't have confidence scores language=self.config.language, - isFinal=True, + isFinal=isFinal, ).model_dump() createdTranscript = interface.createTranscript(transcriptData) @@ -325,7 +312,7 @@ class TeamsbotService: # Update context buffer self._contextBuffer.append({ "speaker": speaker or "Unknown", - "text": transcriptText, + "text": text, "timestamp": getUtcTimestamp(), }) # Keep only last N segments @@ -337,8 +324,8 @@ class TeamsbotService: await _emitSessionEvent(sessionId, "transcript", { "id": createdTranscript.get("id"), "speaker": speaker, - "text": transcriptText, - "confidence": confidence, + "text": text, + "confidence": 1.0, "timestamp": getUtcTimestamp(), }) @@ -348,14 +335,17 @@ class TeamsbotService: count = session.get("transcriptSegmentCount", 0) + 1 interface.updateSession(sessionId, {"transcriptSegmentCount": count}) - # Step 2: Check if AI analysis should be triggered + # Check if AI analysis should be triggered (only for final transcripts) + if not isFinal: + return + if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY: return - if not self._shouldTriggerAnalysis(transcriptText): + if not self._shouldTriggerAnalysis(text): return - # Step 3: SPEECH_TEAMS AI analysis + # SPEECH_TEAMS AI analysis await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript) def _shouldTriggerAnalysis(self, transcriptText: str) -> bool: