""" Google Cloud Voice Services Routes Replaces Azure voice services with Google Cloud Speech-to-Text and Translation """ import os import logging from fastapi import APIRouter, File, Form, UploadFile, Depends, HTTPException, Body from fastapi.responses import Response from typing import Optional, Dict, Any from modules.connectors.connectorGoogleSpeech import ConnectorGoogleSpeech from modules.security.auth import getCurrentUser from modules.interfaces.interfaceAppModel import User from modules.interfaces.interfaceComponentObjects import getInterface logger = logging.getLogger(__name__) router = APIRouter(prefix="/voice-google", tags=["Voice Google"]) # Global connector instance _google_speech_connector = None def get_google_speech_connector() -> ConnectorGoogleSpeech: """Get or create Google Cloud Speech connector instance.""" global _google_speech_connector if _google_speech_connector is None: try: _google_speech_connector = ConnectorGoogleSpeech() logger.info("✅ Google Cloud Speech connector initialized") except Exception as e: logger.error(f"❌ Failed to initialize Google Cloud Speech connector: {e}") raise HTTPException( status_code=500, detail=f"Failed to initialize Google Cloud Speech connector: {str(e)}" ) return _google_speech_connector @router.post("/speech-to-text") async def speech_to_text( audio_file: UploadFile = File(...), language: str = Form("de-DE"), current_user: User = Depends(getCurrentUser) ): """Convert speech to text using Google Cloud Speech-to-Text API.""" try: logger.info(f"🎤 Speech-to-text request: {audio_file.filename}, language: {language}") # Read audio file audio_content = await audio_file.read() logger.info(f"📊 Audio file size: {len(audio_content)} bytes") # Validate audio format connector = get_google_speech_connector() validation = connector.validate_audio_format(audio_content) if not validation["valid"]: raise HTTPException( status_code=400, detail=f"Invalid audio format: {validation.get('error', 'Unknown error')}" ) # Perform speech recognition result = await connector.speech_to_text( audio_content=audio_content, language=language ) if result["success"]: return { "success": True, "text": result["text"], "confidence": result["confidence"], "language": result["language"], "audio_info": { "size": len(audio_content), "format": validation["format"], "estimated_duration": validation.get("estimated_duration", 0) } } else: raise HTTPException( status_code=400, detail=f"Speech recognition failed: {result.get('error', 'Unknown error')}" ) except HTTPException: raise except Exception as e: logger.error(f"❌ Speech-to-text error: {e}") raise HTTPException( status_code=500, detail=f"Speech-to-text processing failed: {str(e)}" ) @router.post("/translate") async def translate_text( text: str = Form(...), source_language: str = Form("de"), target_language: str = Form("en"), current_user: User = Depends(getCurrentUser) ): """Translate text using Google Cloud Translation API.""" try: logger.info(f"🌐 Translation request: '{text}' ({source_language} -> {target_language})") if not text.strip(): raise HTTPException( status_code=400, detail="Empty text provided for translation" ) # Perform translation connector = get_google_speech_connector() result = await connector.translate_text( text=text, source_language=source_language, target_language=target_language ) if result["success"]: return { "success": True, "original_text": result["original_text"], "translated_text": result["translated_text"], "source_language": result["source_language"], "target_language": result["target_language"] } else: raise HTTPException( status_code=400, detail=f"Translation failed: {result.get('error', 'Unknown error')}" ) except HTTPException: raise except Exception as e: logger.error(f"❌ Translation error: {e}") raise HTTPException( status_code=500, detail=f"Translation processing failed: {str(e)}" ) @router.post("/realtime-interpreter") async def realtime_interpreter( audio_file: UploadFile = File(...), from_language: str = Form("de-DE"), to_language: str = Form("en-US"), connection_id: str = Form(None), current_user: User = Depends(getCurrentUser) ): """Real-time interpreter: speech to translated text using Google Cloud APIs.""" try: logger.info(f"🔄 Real-time interpreter request: {audio_file.filename}") logger.info(f" From: {from_language} -> To: {to_language}") logger.info(f" MIME type: {audio_file.content_type}") # Read audio file audio_content = await audio_file.read() logger.info(f"📊 Audio file size: {len(audio_content)} bytes") # Save audio file for debugging with correct extension file_extension = "webm" if audio_file.filename.endswith('.webm') else "wav" debug_filename = f"debug_audio/audio_google_{audio_file.filename.replace('.wav', '.webm')}" os.makedirs("debug_audio", exist_ok=True) with open(debug_filename, "wb") as f: f.write(audio_content) logger.info(f"💾 Saved audio file for debugging: {debug_filename}") # Validate audio format connector = get_google_speech_connector() validation = connector.validate_audio_format(audio_content) if not validation["valid"]: raise HTTPException( status_code=400, detail=f"Invalid audio format: {validation.get('error', 'Unknown error')}" ) # Perform complete pipeline: Speech-to-Text + Translation result = await connector.speech_to_translated_text( audio_content=audio_content, from_language=from_language, to_language=to_language ) if result["success"]: logger.info(f"✅ Real-time interpreter successful:") logger.info(f" Original: '{result['original_text']}'") logger.info(f" Translated: '{result['translated_text']}'") return { "success": True, "original_text": result["original_text"], "translated_text": result["translated_text"], "confidence": result["confidence"], "source_language": result["source_language"], "target_language": result["target_language"], "audio_info": { "size": len(audio_content), "format": validation["format"], "estimated_duration": validation.get("estimated_duration", 0) } } else: raise HTTPException( status_code=400, detail=f"Real-time interpreter failed: {result.get('error', 'Unknown error')}" ) except HTTPException: raise except Exception as e: logger.error(f"❌ Real-time interpreter error: {e}") raise HTTPException( status_code=500, detail=f"Real-time interpreter processing failed: {str(e)}" ) @router.post("/text-to-speech") async def text_to_speech( text: str = Form(...), language: str = Form("de-DE"), voice: str = Form(None), current_user: User = Depends(getCurrentUser) ): """Convert text to speech using Google Cloud Text-to-Speech.""" try: logger.info(f"Text-to-Speech request: '{text[:50]}...' in {language}") if not text.strip(): raise HTTPException( status_code=400, detail="Empty text provided for text-to-speech" ) connector = get_google_speech_connector() result = await connector.text_to_speech( text=text, language_code=language, voice_name=voice ) if result["success"]: return Response( content=result["audio_content"], media_type="audio/mpeg", headers={ "Content-Disposition": "attachment; filename=speech.mp3", "X-Voice-Name": result["voice_name"], "X-Language-Code": result["language_code"] } ) else: raise HTTPException( status_code=400, detail=f"Text-to-Speech failed: {result.get('error', 'Unknown error')}" ) except HTTPException: raise except Exception as e: logger.error(f"Text-to-Speech error: {e}") raise HTTPException( status_code=500, detail=f"Text-to-Speech processing failed: {str(e)}" ) @router.get("/health") async def health_check(current_user: User = Depends(getCurrentUser)): """Health check for Google Cloud voice services.""" try: connector = get_google_speech_connector() # Test with a simple translation test_result = await connector.translate_text( text="Hello", source_language="en", target_language="de" ) if test_result["success"]: return { "status": "healthy", "service": "Google Cloud Speech-to-Text & Translation", "test_translation": test_result["translated_text"] } else: return { "status": "unhealthy", "error": test_result.get("error", "Unknown error") } except Exception as e: logger.error(f"❌ Health check failed: {e}") return { "status": "unhealthy", "error": str(e) } @router.get("/settings") async def get_voice_settings(current_user: User = Depends(getCurrentUser)): """Get voice settings for the current user.""" try: logger.info(f"Getting voice settings for user: {current_user.id}") # Get database interface with user context interface = getInterface(current_user) # Get or create voice settings for the user voice_settings = interface.getOrCreateVoiceSettings(current_user.id) if voice_settings: # Return user settings return { "success": True, "data": { "user_settings": voice_settings.to_dict(), "default_settings": { "sttLanguage": "de-DE", "ttsLanguage": "de-DE", "ttsVoice": "de-DE-Wavenet-A", "translationEnabled": True, "targetLanguage": "en-US" } } } else: # Fallback to default settings if database fails logger.warning("Failed to get voice settings from database, using defaults") return { "success": True, "data": { "user_settings": None, "default_settings": { "sttLanguage": "de-DE", "ttsLanguage": "de-DE", "ttsVoice": "de-DE-Wavenet-A", "translationEnabled": True, "targetLanguage": "en-US" } } } except Exception as e: logger.error(f"Error getting voice settings: {e}") raise HTTPException( status_code=500, detail=f"Failed to get voice settings: {str(e)}" ) @router.post("/settings") async def save_voice_settings( settings: Dict[str, Any] = Body(...), current_user: User = Depends(getCurrentUser) ): """Save voice settings for the current user.""" try: logger.info(f"Saving voice settings for user: {current_user.id}") logger.info(f"Settings: {settings}") # Validate required settings required_fields = ["sttLanguage", "ttsLanguage", "ttsVoice"] for field in required_fields: if field not in settings: raise HTTPException( status_code=400, detail=f"Missing required field: {field}" ) # Set default values for optional fields if not provided if "translationEnabled" not in settings: settings["translationEnabled"] = True if "targetLanguage" not in settings: settings["targetLanguage"] = "en-US" # Get database interface with user context interface = getInterface(current_user) # Check if settings already exist for this user existing_settings = interface.getVoiceSettings(current_user.id) if existing_settings: # Update existing settings logger.info(f"Updating existing voice settings for user {current_user.id}") updated_settings = interface.updateVoiceSettings(current_user.id, settings) logger.info(f"Voice settings updated for user {current_user.id}: {updated_settings}") else: # Create new settings logger.info(f"Creating new voice settings for user {current_user.id}") # Add userId to settings settings["userId"] = current_user.id created_settings = interface.createVoiceSettings(settings) logger.info(f"Voice settings created for user {current_user.id}: {created_settings}") return { "success": True, "message": "Voice settings saved successfully", "data": settings } except HTTPException: raise except Exception as e: logger.error(f"Error saving voice settings: {e}") raise HTTPException( status_code=500, detail=f"Failed to save voice settings: {str(e)}" )