# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Bridge Connector - Communication with the .NET Media Bridge. Handles HTTP and WebSocket communication for meeting join/leave and audio streaming. """ import logging import aiohttp from typing import Optional, Dict, Any logger = logging.getLogger(__name__) # Default timeout for bridge HTTP calls _BRIDGE_TIMEOUT = aiohttp.ClientTimeout(total=30) class BridgeConnector: """Connector to the .NET Media Bridge service.""" def __init__(self, bridgeUrl: Optional[str] = None): self.bridgeUrl = (bridgeUrl or "").rstrip("/") def _isConfigured(self) -> bool: """Check if the bridge URL is configured.""" return bool(self.bridgeUrl) async def joinMeeting( self, meetingLink: str, botName: str, backgroundImageUrl: Optional[str], sessionId: str, gatewayCallbackUrl: str, gatewayWsUrl: str, gatewayBaseUrl: str = "", ) -> Dict[str, Any]: """ Send join command to the .NET Media Bridge. Args: gatewayBaseUrl: Base URL of this gateway instance so the bridge can build full callback/WebSocket URLs per-session. Returns: Dict with 'success' bool and 'bridgeSessionId' or 'error' string. """ if not self._isConfigured(): logger.warning("Bridge URL not configured. Simulating join for development.") return { "success": True, "bridgeSessionId": f"dev-{sessionId[:8]}", "message": "Development mode: Bridge not connected" } payload = { "meetingLink": meetingLink, "botName": botName, "backgroundImageUrl": backgroundImageUrl, "sessionId": sessionId, "gatewayCallbackUrl": gatewayCallbackUrl, "gatewayWsUrl": gatewayWsUrl, "gatewayBaseUrl": gatewayBaseUrl, } try: async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session: async with session.post(f"{self.bridgeUrl}/bridge/join", json=payload) as resp: if resp.status == 200: data = await resp.json() return { "success": True, "bridgeSessionId": data.get("bridgeSessionId"), } else: errorText = await resp.text() logger.error(f"Bridge join failed: {resp.status} - {errorText}") return {"success": False, "error": f"Bridge returned {resp.status}: {errorText}"} except aiohttp.ClientError as e: logger.error(f"Bridge connection error: {e}") return {"success": False, "error": f"Bridge connection failed: {str(e)}"} except Exception as e: logger.error(f"Bridge join error: {e}") return {"success": False, "error": str(e)} async def leaveMeeting(self, sessionId: str) -> Dict[str, Any]: """Send leave command to the .NET Media Bridge.""" if not self._isConfigured(): logger.warning("Bridge URL not configured. Simulating leave for development.") return {"success": True, "message": "Development mode: Bridge not connected"} try: async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session: async with session.post(f"{self.bridgeUrl}/bridge/leave/{sessionId}") as resp: if resp.status == 200: return {"success": True} else: errorText = await resp.text() logger.error(f"Bridge leave failed: {resp.status} - {errorText}") return {"success": False, "error": f"Bridge returned {resp.status}: {errorText}"} except Exception as e: logger.error(f"Bridge leave error: {e}") return {"success": False, "error": str(e)} async def getStatus(self) -> Dict[str, Any]: """Get bridge health and active sessions.""" if not self._isConfigured(): return {"healthy": False, "message": "Bridge URL not configured"} try: async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session: async with session.get(f"{self.bridgeUrl}/bridge/status") as resp: if resp.status == 200: return await resp.json() else: return {"healthy": False, "error": f"Bridge returned {resp.status}"} except Exception as e: return {"healthy": False, "error": str(e)}