""" Azure Voice Services WebSocket Routes Provides real-time WebSocket endpoints for: - Live microphone audio streaming - Real-time speech-to-text - Real-time translation - Real-time text-to-speech """ import logging import asyncio import json from typing import Dict, Any, Optional, List from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends, Query from fastapi.websockets import WebSocketState import base64 import io from datetime import datetime from modules.interfaces.interfaceAppObjects import getRootInterface from modules.interfaces.interfaceAppModel import UserInDB from modules.security.auth import getCurrentUser from modules.connectors.connectorAzureSpeech import ConnectorAzureSpeech logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/voice/ws", tags=["voice-websocket"]) class ConnectionManager: """Manages WebSocket connections for real-time voice services.""" def __init__(self): self.active_connections: Dict[str, WebSocket] = {} self.user_connections: Dict[str, str] = {} # user_id -> connection_id self.connection_metadata: Dict[str, Dict] = {} async def connect(self, websocket: WebSocket, connection_id: str, user_id: str, metadata: Dict = None): """Accept a WebSocket connection.""" await websocket.accept() self.active_connections[connection_id] = websocket self.user_connections[user_id] = connection_id self.connection_metadata[connection_id] = metadata or {} logger.info(f"WebSocket connected: {connection_id} for user {user_id}") def disconnect(self, connection_id: str): """Remove a WebSocket connection.""" if connection_id in self.active_connections: del self.active_connections[connection_id] # Remove from user mapping user_id = None for uid, cid in self.user_connections.items(): if cid == connection_id: user_id = uid break if user_id: del self.user_connections[user_id] if connection_id in self.connection_metadata: del self.connection_metadata[connection_id] logger.info(f"WebSocket disconnected: {connection_id}") async def send_message(self, connection_id: str, message: Dict): """Send a message to a specific connection.""" if connection_id in self.active_connections: websocket = self.active_connections[connection_id] try: if websocket.client_state == WebSocketState.CONNECTED: await websocket.send_text(json.dumps(message)) else: self.disconnect(connection_id) except Exception as e: logger.error(f"Error sending message to {connection_id}: {str(e)}") self.disconnect(connection_id) async def send_error(self, connection_id: str, error: str, error_code: str = "GENERAL_ERROR"): """Send an error message to a connection.""" await self.send_message(connection_id, { "type": "error", "error": error, "error_code": error_code, "timestamp": datetime.now().isoformat() }) def get_connection_metadata(self, connection_id: str) -> Dict: """Get metadata for a connection.""" return self.connection_metadata.get(connection_id, {}) # Global connection manager manager = ConnectionManager() async def get_azure_speech_connector_ws(user_id: str) -> Optional[ConnectorAzureSpeech]: """Get Azure Speech connector for WebSocket user.""" try: root_interface = getRootInterface() # Get user by ID user = root_interface.getUser(user_id) if not user: logger.error(f"User with ID {user_id} not found") return None # Get user connections user_connections = root_interface.getUserConnections(user_id) azure_connection = None # Find first Azure connection for connection in user_connections: if connection.authority == "msft": azure_connection = connection break if not azure_connection: logger.error(f"No Azure connection found for user {user_id}") return None # Get connection token connection_token = root_interface.getConnectionToken(azure_connection.id) if not connection_token: logger.error(f"No connection token found for user {user_id}") return None # Use Azure Speech Services subscription key from configuration from modules.shared.configuration import APP_CONFIG subscription_key = APP_CONFIG.get("Connector_AzureSpeech_SUBSCRIPTION_KEY") if not subscription_key or subscription_key == "your-azure-speech-subscription-key-here": logger.error("Azure Speech Services subscription key not configured") return None # Get region from configuration region = APP_CONFIG.get("Connector_AzureSpeech_REGION", "westeurope") # Create Azure Speech connector connector = ConnectorAzureSpeech(subscription_key=subscription_key, region=region) return connector except Exception as e: logger.error(f"Error getting Azure Speech connector for WebSocket: {str(e)}") return None @router.websocket("/realtime-interpreter") async def websocket_realtime_interpreter( websocket: WebSocket, user_id: str = Query(...), from_language: str = Query("de-DE"), to_language: str = Query("en-US"), audio_format: str = Query("wav") ): """WebSocket endpoint for real-time interpreter with live audio streaming.""" connection_id = f"interpreter_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: # Connect WebSocket await manager.connect(websocket, connection_id, user_id, { "service": "realtime_interpreter", "from_language": from_language, "to_language": to_language, "audio_format": audio_format }) # Get Azure Speech connector connector = await get_azure_speech_connector_ws(user_id) if not connector: await manager.send_error(connection_id, "Azure Speech Services not configured", "NO_CONNECTOR") return # Send connection confirmation await manager.send_message(connection_id, { "type": "connected", "connection_id": connection_id, "service": "realtime_interpreter", "from_language": from_language, "to_language": to_language, "timestamp": datetime.now().isoformat() }) # Process incoming audio chunks audio_buffer = io.BytesIO() chunk_count = 0 while True: try: # Receive message from client data = await websocket.receive_text() message = json.loads(data) if message.get("type") == "audio_chunk": # Decode base64 audio data audio_data = base64.b64decode(message.get("data", "")) audio_buffer.write(audio_data) chunk_count += 1 # Process every 10 chunks or when buffer is large enough if chunk_count % 10 == 0 or len(audio_data) > 8192: audio_buffer.seek(0) audio_content = audio_buffer.read() if len(audio_content) > 0: try: # Convert speech to text stt_result = await connector.speech_to_text( audio_content=audio_content, language=from_language, format="detailed", audio_format=audio_format ) original_text = stt_result.get("text", "") # Translate if different languages translated_text = original_text if from_language != to_language and original_text.strip(): try: translated_text = await connector.translate_text( text=original_text, from_language=from_language, to_language=to_language ) except Exception as e: logger.warning(f"Translation failed: {str(e)}") translated_text = original_text # Send result back to client await manager.send_message(connection_id, { "type": "translation_result", "original_text": original_text, "translated_text": translated_text, "from_language": from_language, "to_language": to_language, "confidence": stt_result.get("confidence", 0.0), "timestamp": datetime.now().isoformat() }) except Exception as e: logger.error(f"Error processing audio chunk: {str(e)}") await manager.send_error(connection_id, f"Audio processing failed: {str(e)}", "PROCESSING_ERROR") # Reset buffer audio_buffer = io.BytesIO() chunk_count = 0 elif message.get("type") == "ping": # Respond to ping with pong await manager.send_message(connection_id, { "type": "pong", "timestamp": datetime.now().isoformat() }) elif message.get("type") == "disconnect": # Client requested disconnect break except WebSocketDisconnect: break except json.JSONDecodeError: await manager.send_error(connection_id, "Invalid JSON message", "INVALID_JSON") except Exception as e: logger.error(f"Error processing WebSocket message: {str(e)}") await manager.send_error(connection_id, f"Message processing failed: {str(e)}", "MESSAGE_ERROR") except WebSocketDisconnect: pass except Exception as e: logger.error(f"WebSocket error: {str(e)}") try: await manager.send_error(connection_id, f"Connection error: {str(e)}", "CONNECTION_ERROR") except: pass finally: manager.disconnect(connection_id) @router.websocket("/speech-to-text") async def websocket_speech_to_text( websocket: WebSocket, user_id: str = Query(...), language: str = Query("de-DE"), audio_format: str = Query("wav") ): """WebSocket endpoint for real-time speech-to-text with live audio streaming.""" connection_id = f"stt_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: # Connect WebSocket await manager.connect(websocket, connection_id, user_id, { "service": "speech_to_text", "language": language, "audio_format": audio_format }) # Get Azure Speech connector connector = await get_azure_speech_connector_ws(user_id) if not connector: await manager.send_error(connection_id, "Azure Speech Services not configured", "NO_CONNECTOR") return # Send connection confirmation await manager.send_message(connection_id, { "type": "connected", "connection_id": connection_id, "service": "speech_to_text", "language": language, "timestamp": datetime.now().isoformat() }) # Process incoming audio chunks audio_buffer = io.BytesIO() chunk_count = 0 while True: try: # Receive message from client data = await websocket.receive_text() message = json.loads(data) if message.get("type") == "audio_chunk": # Decode base64 audio data audio_data = base64.b64decode(message.get("data", "")) audio_buffer.write(audio_data) chunk_count += 1 # Process every 10 chunks or when buffer is large enough if chunk_count % 10 == 0 or len(audio_data) > 8192: audio_buffer.seek(0) audio_content = audio_buffer.read() if len(audio_content) > 0: try: # Convert speech to text stt_result = await connector.speech_to_text( audio_content=audio_content, language=language, format="detailed", audio_format=audio_format ) # Send result back to client await manager.send_message(connection_id, { "type": "transcription_result", "text": stt_result.get("text", ""), "confidence": stt_result.get("confidence", 0.0), "language": stt_result.get("language", language), "is_final": stt_result.get("RecognitionStatus") == "Success", "timestamp": datetime.now().isoformat() }) except Exception as e: logger.error(f"Error processing audio chunk: {str(e)}") await manager.send_error(connection_id, f"Audio processing failed: {str(e)}", "PROCESSING_ERROR") # Reset buffer audio_buffer = io.BytesIO() chunk_count = 0 elif message.get("type") == "ping": # Respond to ping with pong await manager.send_message(connection_id, { "type": "pong", "timestamp": datetime.now().isoformat() }) elif message.get("type") == "disconnect": # Client requested disconnect break except WebSocketDisconnect: break except json.JSONDecodeError: await manager.send_error(connection_id, "Invalid JSON message", "INVALID_JSON") except Exception as e: logger.error(f"Error processing WebSocket message: {str(e)}") await manager.send_error(connection_id, f"Message processing failed: {str(e)}", "MESSAGE_ERROR") except WebSocketDisconnect: pass except Exception as e: logger.error(f"WebSocket error: {str(e)}") try: await manager.send_error(connection_id, f"Connection error: {str(e)}", "CONNECTION_ERROR") except: pass finally: manager.disconnect(connection_id) @router.websocket("/text-to-speech") async def websocket_text_to_speech( websocket: WebSocket, user_id: str = Query(...), language: str = Query("de-DE"), voice: str = Query("de-DE-KatjaNeural"), audio_format: str = Query("audio-16khz-128kbitrate-mono-mp3") ): """WebSocket endpoint for real-time text-to-speech streaming.""" connection_id = f"tts_{user_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: # Connect WebSocket await manager.connect(websocket, connection_id, user_id, { "service": "text_to_speech", "language": language, "voice": voice, "audio_format": audio_format }) # Get Azure Speech connector connector = await get_azure_speech_connector_ws(user_id) if not connector: await manager.send_error(connection_id, "Azure Speech Services not configured", "NO_CONNECTOR") return # Send connection confirmation await manager.send_message(connection_id, { "type": "connected", "connection_id": connection_id, "service": "text_to_speech", "language": language, "voice": voice, "timestamp": datetime.now().isoformat() }) while True: try: # Receive message from client data = await websocket.receive_text() message = json.loads(data) if message.get("type") == "text_to_speak": text = message.get("text", "") if text.strip(): try: # Convert text to speech audio_data = await connector.text_to_speech( text=text, language=language, voice=voice, format=audio_format ) # Send audio data back to client audio_base64 = base64.b64encode(audio_data).decode('utf-8') await manager.send_message(connection_id, { "type": "audio_data", "audio_data": audio_base64, "format": audio_format, "voice": voice, "text": text, "timestamp": datetime.now().isoformat() }) except Exception as e: logger.error(f"Error converting text to speech: {str(e)}") await manager.send_error(connection_id, f"TTS failed: {str(e)}", "TTS_ERROR") elif message.get("type") == "ping": # Respond to ping with pong await manager.send_message(connection_id, { "type": "pong", "timestamp": datetime.now().isoformat() }) elif message.get("type") == "disconnect": # Client requested disconnect break except WebSocketDisconnect: break except json.JSONDecodeError: await manager.send_error(connection_id, "Invalid JSON message", "INVALID_JSON") except Exception as e: logger.error(f"Error processing WebSocket message: {str(e)}") await manager.send_error(connection_id, f"Message processing failed: {str(e)}", "MESSAGE_ERROR") except WebSocketDisconnect: pass except Exception as e: logger.error(f"WebSocket error: {str(e)}") try: await manager.send_error(connection_id, f"Connection error: {str(e)}", "CONNECTION_ERROR") except: pass finally: manager.disconnect(connection_id) @router.get("/status") async def websocket_status(): """Get WebSocket connection status.""" return { "active_connections": len(manager.active_connections), "connected_users": len(manager.user_connections), "services": { "realtime_interpreter": len([c for c in manager.connection_metadata.values() if c.get("service") == "realtime_interpreter"]), "speech_to_text": len([c for c in manager.connection_metadata.values() if c.get("service") == "speech_to_text"]), "text_to_speech": len([c for c in manager.connection_metadata.values() if c.get("service") == "text_to_speech"]) } }