# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Google Cloud Voice Services Routes Replaces Azure voice services with Google Cloud Speech-to-Text and Translation Includes WebSocket support for real-time voice streaming """ import asyncio import logging import json import base64 import secrets import time from fastapi import APIRouter, Depends, HTTPException, Query, Request, WebSocket, WebSocketDisconnect from typing import Optional, Dict, Any, List from modules.auth import getCurrentUser, getRequestContext, RequestContext, limiter from modules.datamodels.datamodelUam import User from modules.interfaces.interfaceVoiceObjects import getVoiceInterface, VoiceObjects from modules.shared.voiceCatalog import getCatalogPayload logger = logging.getLogger(__name__) router = APIRouter(prefix="/voice-google", tags=["Voice Google"]) # Store active WebSocket connections activeConnections: Dict[str, WebSocket] = {} class ConnectionManager: def __init__(self): self.activeConnections: List[WebSocket] = [] async def connect(self, websocket: WebSocket, connectionId: str): await websocket.accept() self.activeConnections.append(websocket) activeConnections[connectionId] = websocket logger.info(f"WebSocket connected: {connectionId}") def disconnect(self, websocket: WebSocket, connectionId: str): if websocket in self.activeConnections: self.activeConnections.remove(websocket) if connectionId in activeConnections: del activeConnections[connectionId] logger.info(f"WebSocket disconnected: {connectionId}") async def send_personal_message(self, message: dict, websocket: WebSocket): try: await websocket.send_text(json.dumps(message)) except Exception as e: logger.error(f"Error sending message: {e}") manager = ConnectionManager() def _getVoiceInterface(currentUser: User) -> VoiceObjects: """Get voice interface instance with user context.""" try: return getVoiceInterface(currentUser) except Exception as e: logger.error(f"Failed to initialize voice interface: {e}") raise HTTPException( status_code=500, detail=f"Failed to initialize voice interface: {str(e)}" ) @router.get("/languages") async def get_available_languages(currentUser: User = Depends(getCurrentUser)): """Return the curated voice/language catalog (single source of truth). Each entry: {bcp47, iso, label, flag, defaultVoice}. Same payload as /api/voice/languages — both endpoints back the same catalog. """ return { "success": True, "languages": getCatalogPayload(), } @router.get("/voices") async def get_available_voices( languageCode: Optional[str] = None, language_code: Optional[str] = None, # Accept both camelCase and snake_case currentUser: User = Depends(getCurrentUser) ): """ Get available voices from Google Cloud Text-to-Speech. Accepts languageCode (camelCase) or language_code (snake_case) query parameter. """ # Use language_code if provided (frontend sends this), otherwise use languageCode if language_code: languageCode = language_code try: logger.info(f"🎤 Getting available voices, language filter: {languageCode}") voiceInterface = _getVoiceInterface(currentUser) result = await voiceInterface.getAvailableVoices(languageCode=languageCode) if result["success"]: return { "success": True, "voices": result["voices"], "language_filter": languageCode } else: raise HTTPException( status_code=400, detail=f"Failed to get voices: {result.get('error', 'Unknown error')}" ) except HTTPException: raise except Exception as e: logger.error(f"❌ Get voices error: {e}") raise HTTPException( status_code=500, detail=f"Failed to get available voices: {str(e)}" ) # ========================================================================= # STT Streaming WebSocket — generic, used by all features # ========================================================================= _sttTokens: Dict[str, Dict[str, Any]] = {} _STT_TOKEN_TTL = 45 def _cleanupSttTokens(): now = time.time() expired = [t for t, p in _sttTokens.items() if p.get("expiresAt", 0) <= now] for t in expired: _sttTokens.pop(t, None) @router.post("/stt/token") @limiter.limit("60/minute") async def createSttToken( request: Request, context: RequestContext = Depends(getRequestContext), ): """Issue a short-lived single-use token for the STT streaming WebSocket.""" _cleanupSttTokens() token = secrets.token_urlsafe(32) _sttTokens[token] = { "userId": str(context.user.id), "mandateId": str(getattr(context, "mandateId", "") or ""), "expiresAt": time.time() + _STT_TOKEN_TTL, } return {"wsToken": token, "expiresInSeconds": _STT_TOKEN_TTL} @router.websocket("/stt/stream") async def sttStream( websocket: WebSocket, wsToken: Optional[str] = Query(None), ): """ Generic STT streaming WebSocket. Protocol: Client sends JSON: {"type": "open", "language": "de-DE"} {"type": "audio", "chunk": ""} {"type": "close"} Server sends JSON: {"type": "interim", "text": "..."} {"type": "final", "text": "...", "confidence": 0.95} {"type": "error", "message": "..."} {"type": "closed"} """ await websocket.accept() # --- authenticate via wsToken --- if not wsToken: await websocket.send_json({"type": "error", "code": "ws_token_required", "message": "wsToken query param required"}) await websocket.close(code=1008) return _cleanupSttTokens() tokenPayload = _sttTokens.pop(wsToken, None) if not tokenPayload: await websocket.send_json({"type": "error", "code": "ws_token_invalid", "message": "Invalid or expired wsToken"}) await websocket.close(code=1008) return tokenUserId = tokenPayload["userId"] tokenMandateId = tokenPayload.get("mandateId", "") # Resolve real user for billing from modules.interfaces.interfaceDbApp import getRootInterface rootInterface = getRootInterface() currentUser = rootInterface.getUser(tokenUserId) if not currentUser: await websocket.send_json({"type": "error", "code": "user_not_found", "message": "User not found"}) await websocket.close(code=1008) return # --- billing pre-flight --- billingService = None try: from modules.serviceCenter.services.serviceBilling.mainServiceBilling import getService as getBillingService billingService = getBillingService(currentUser, tokenMandateId) billingCheck = billingService.checkBalance(0.0) if not billingCheck.allowed: await websocket.send_json({"type": "error", "code": "billing_insufficient", "message": "Insufficient balance for voice services"}) await websocket.close(code=1008) return except Exception as e: logger.warning(f"STT billing pre-flight skipped: {e}") audioQueue: asyncio.Queue = asyncio.Queue() language = "de-DE" streamingTask: Optional[asyncio.Task] = None voiceInterface: Optional[VoiceObjects] = None async def _sendJson(payload: Dict[str, Any]) -> bool: try: await websocket.send_json(payload) return True except Exception: return False async def _runStreaming(): nonlocal voiceInterface voiceInterface = getVoiceInterface(currentUser, tokenMandateId) if billingService: def _billingCb(data): priceCHF = data.get("priceCHF", 0.0) operation = data.get("operation", "voice") if priceCHF > 0: billingService.recordUsage( priceCHF=priceCHF, aicoreProvider="google-voice", aicoreModel=operation, description=f"Voice {operation}", ) voiceInterface.billingCallback = _billingCb try: async for event in voiceInterface.streamingSpeechToText(audioQueue, language): if event.get("reconnectRequired"): await _sendJson({"type": "reconnect_required"}) return if event.get("isFinal"): if event.get("transcript"): await _sendJson({"type": "final", "text": event["transcript"], "confidence": event.get("confidence", 0.0)}) else: if event.get("transcript"): await _sendJson({"type": "interim", "text": event["transcript"]}) except asyncio.CancelledError: pass except Exception as e: logger.error(f"STT streaming error: {e}") await _sendJson({"type": "error", "message": str(e)}) try: await _sendJson({"type": "status", "label": "STT stream connected"}) while True: raw = await websocket.receive_text() msg = json.loads(raw) msgType = (msg.get("type") or "").strip() if msgType == "open": language = msg.get("language") or "de-DE" if streamingTask and not streamingTask.done(): await audioQueue.put((b"", True)) streamingTask.cancel() audioQueue = asyncio.Queue() streamingTask = asyncio.create_task(_runStreaming()) await _sendJson({"type": "status", "label": "Listening..."}) elif msgType == "audio": chunkB64 = msg.get("chunk") if not chunkB64: continue chunkBytes = base64.b64decode(chunkB64) if len(chunkBytes) > 400_000: await _sendJson({"type": "error", "code": "chunk_too_large", "message": "Audio chunk too large"}) continue await audioQueue.put((chunkBytes, False)) elif msgType == "close": await audioQueue.put((b"", True)) if streamingTask: try: await asyncio.wait_for(streamingTask, timeout=10.0) except (asyncio.TimeoutError, asyncio.CancelledError): pass await _sendJson({"type": "closed"}) await websocket.close() break elif msgType == "ping": await _sendJson({"type": "pong"}) except WebSocketDisconnect: logger.info(f"STT WebSocket disconnected: userId={tokenUserId}") except Exception as e: logger.error(f"STT WebSocket error: {e}", exc_info=True) try: await websocket.send_json({"type": "error", "message": str(e)}) except Exception: pass finally: await audioQueue.put((b"", True)) if streamingTask and not streamingTask.done(): streamingTask.cancel()