# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Interface for Voice Services Provides a generic interface layer between routes and voice connectors. Handles voice operations including speech-to-text, text-to-speech, and translation. """ import logging from typing import Dict, Any, Optional, List from modules.connectors.connectorVoiceGoogle import ConnectorGoogleSpeech from modules.datamodels.datamodelVoice import VoiceSettings from modules.datamodels.datamodelUam import User from modules.shared.timeUtils import getUtcTimestamp logger = logging.getLogger(__name__) # Singleton factory for Voice instances _instancesVoice = {} class VoiceObjects: """ Interface for Voice Services. Provides a generic interface layer between routes and voice connectors. """ def __init__(self): """Initialize the Voice Interface.""" self.currentUser: Optional[User] = None self.userId: Optional[str] = None self._google_speech_connector: Optional[ConnectorGoogleSpeech] = None def setUserContext(self, currentUser: User, mandateId: Optional[str] = None): """Set the user context for the interface. Args: currentUser: The authenticated user mandateId: The mandate ID from RequestContext (X-Mandate-Id header) """ if not currentUser: logger.info("Initializing voice interface without user context") return self.currentUser = currentUser self.userId = currentUser.id # Use mandateId from parameter (Request-Context), not from user object self.mandateId = mandateId if not self.userId: raise ValueError("Invalid user context: id is required") logger.debug(f"Voice interface user context set: userId={self.userId}, mandateId={self.mandateId}") def _getGoogleSpeechConnector(self) -> ConnectorGoogleSpeech: """Get or create Google Cloud Speech connector instance.""" if self._google_speech_connector is None: try: self._google_speech_connector = ConnectorGoogleSpeech() logger.info("✅ Google Cloud Speech connector initialized") except Exception as e: logger.error(f"❌ Failed to initialize Google Cloud Speech connector: {e}") raise return self._google_speech_connector # Speech-to-Text Operations async def speechToText(self, audioContent: bytes, language: str = "de-DE", sampleRate: int = None, channels: int = None) -> Dict[str, Any]: """ Convert speech to text using Google Cloud Speech-to-Text API. Args: audioContent: Raw audio data language: Language code (e.g., 'de-DE', 'en-US') sampleRate: Audio sample rate (auto-detected if None) channels: Number of audio channels (auto-detected if None) Returns: Dict containing transcribed text, confidence, and metadata """ try: logger.info(f"🎤 Speech-to-text request: {len(audioContent)} bytes, language: {language}") connector = self._getGoogleSpeechConnector() result = await connector.speechToText( audioContent=audioContent, language=language, sampleRate=sampleRate, channels=channels ) if result["success"]: logger.info(f"✅ Speech-to-text successful: '{result['text']}' (confidence: {result['confidence']:.2f})") else: logger.warning(f"⚠️ Speech-to-text failed: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Speech-to-text error: {e}") return { "success": False, "text": "", "confidence": 0.0, "error": str(e) } # Translation Operations async def detectLanguage(self, text: str) -> Dict[str, Any]: """ Detect the language of text using Google Cloud Translation API. Args: text: Text to detect language for Returns: Dict containing detected language code and confidence """ try: logger.info(f"🔍 Language detection request: '{text[:100]}...'") if not text.strip(): return { "success": False, "language": "", "error": "Empty text provided" } connector = self._getGoogleSpeechConnector() result = await connector.detectLanguage(text) if result["success"]: logger.info(f"✅ Language detected: {result['language']}") else: logger.warning(f"⚠️ Language detection failed: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Language detection error: {e}") return { "success": False, "language": "", "error": str(e) } async def translateText(self, text: str, sourceLanguage: str = "de", targetLanguage: str = "en") -> Dict[str, Any]: """ Translate text using Google Cloud Translation API. Args: text: Text to translate sourceLanguage: Source language code (e.g., 'de', 'en') targetLanguage: Target language code (e.g., 'en', 'de') Returns: Dict containing translated text and metadata """ try: logger.info(f"🌐 Translation request: '{text}' ({sourceLanguage} -> {targetLanguage})") if not text.strip(): return { "success": False, "translated_text": "", "error": "Empty text provided" } connector = self._getGoogleSpeechConnector() result = await connector.translateText( text=text, sourceLanguage=sourceLanguage, targetLanguage=targetLanguage ) if result["success"]: logger.info(f"✅ Translation successful: '{result['translated_text']}'") else: logger.warning(f"⚠️ Translation failed: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Translation error: {e}") return { "success": False, "translated_text": "", "error": str(e) } # Combined Operations async def speechToTranslatedText(self, audioContent: bytes, fromLanguage: str = "de-DE", toLanguage: str = "en") -> Dict[str, Any]: """ Complete pipeline: Speech-to-Text + Translation. Args: audioContent: Raw audio data fromLanguage: Source language for speech recognition toLanguage: Target language for translation Returns: Dict containing original text, translated text, and metadata """ try: logger.info(f"🔄 Speech-to-translation pipeline: {fromLanguage} -> {toLanguage}") connector = self._getGoogleSpeechConnector() result = await connector.speechToTranslatedText( audioContent=audioContent, fromLanguage=fromLanguage, toLanguage=toLanguage ) if result["success"]: logger.info(f"✅ Complete pipeline successful:") logger.info(f" Original: '{result['original_text']}'") logger.info(f" Translated: '{result['translated_text']}'") else: logger.warning(f"⚠️ Speech-to-translation pipeline failed: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Speech-to-translation pipeline error: {e}") return { "success": False, "original_text": "", "translated_text": "", "error": str(e) } # Text-to-Speech Operations async def textToSpeech(self, text: str, languageCode: str = "de-DE", voiceName: str = None) -> Dict[str, Any]: """ Convert text to speech using Google Cloud Text-to-Speech. Args: text: Text to convert to speech languageCode: Language code (e.g., 'de-DE', 'en-US') voiceName: Specific voice name (optional) Returns: Dict with success status and audio data """ try: logger.info(f"🔊 Text-to-Speech request: '{text[:50]}...' in {languageCode}") if not text.strip(): return { "success": False, "error": "Empty text provided for text-to-speech" } connector = self._getGoogleSpeechConnector() result = await connector.textToSpeech( text=text, languageCode=languageCode, voiceName=voiceName ) if result["success"]: logger.info(f"✅ Text-to-Speech successful: {len(result['audio_content'])} bytes") else: logger.warning(f"⚠️ Text-to-Speech failed: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Text-to-Speech error: {e}") return { "success": False, "error": str(e) } # Voice Settings Management def getVoiceSettings(self, userId: str) -> Optional[VoiceSettings]: """ Get voice settings for a user. Args: userId: User ID to get settings for Returns: VoiceSettings object or None if not found """ try: # This would typically query the database # For now, return None as this is handled by the database interface logger.debug(f"Getting voice settings for user: {userId}") return None except Exception as e: logger.error(f"❌ Error getting voice settings: {e}") return None def createVoiceSettings(self, settingsData: Dict[str, Any]) -> Optional[VoiceSettings]: """ Create new voice settings. Args: settingsData: Dictionary containing voice settings data Returns: Created VoiceSettings object or None if failed """ try: logger.info(f"Creating voice settings: {settingsData}") # Ensure mandateId is set from context if not provided if "mandateId" not in settingsData or not settingsData["mandateId"]: if not self.mandateId: raise ValueError("mandateId is required but not provided and context has no mandateId") settingsData["mandateId"] = self.mandateId # Add timestamps currentTime = getUtcTimestamp() settingsData["creationDate"] = currentTime settingsData["lastModified"] = currentTime # Create VoiceSettings object voiceSettings = VoiceSettings(**settingsData) logger.info(f"✅ Voice settings created: {voiceSettings.id}") return voiceSettings except Exception as e: logger.error(f"❌ Error creating voice settings: {e}") return None def updateVoiceSettings(self, userId: str, settingsData: Dict[str, Any]) -> Optional[VoiceSettings]: """ Update existing voice settings. Args: userId: User ID to update settings for settingsData: Dictionary containing updated voice settings data Returns: Updated VoiceSettings object or None if failed """ try: logger.info(f"Updating voice settings for user {userId}: {settingsData}") # Add last modified timestamp settingsData["lastModified"] = getUtcTimestamp() # Create updated VoiceSettings object voiceSettings = VoiceSettings(**settingsData) logger.info(f"✅ Voice settings updated: {voiceSettings.id}") return voiceSettings except Exception as e: logger.error(f"❌ Error updating voice settings: {e}") return None def getOrCreateVoiceSettings(self, userId: str) -> Optional[VoiceSettings]: """ Get existing voice settings or create default ones. Args: userId: User ID to get/create settings for Returns: VoiceSettings object """ try: # Try to get existing settings existingSettings = self.getVoiceSettings(userId) if existingSettings: return existingSettings # Create default settings if none exist defaultSettings = { "userId": userId, "mandateId": self.mandateId, "sttLanguage": "de-DE", "ttsLanguage": "de-DE", "ttsVoice": "de-DE-Wavenet-A", "translationEnabled": True, "targetLanguage": "en-US" } return self.createVoiceSettings(defaultSettings) except Exception as e: logger.error(f"❌ Error getting or creating voice settings: {e}") return None # Language and Voice Information async def getAvailableLanguages(self) -> Dict[str, Any]: """ Get available languages from Google Cloud Text-to-Speech. Returns: Dict containing success status and list of available languages """ try: logger.info("🌐 Getting available languages from Google Cloud TTS") connector = self._getGoogleSpeechConnector() result = await connector.getAvailableLanguages() if result["success"]: logger.info(f"✅ Found {len(result['languages'])} available languages") else: logger.warning(f"⚠️ Failed to get languages: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Error getting available languages: {e}") return { "success": False, "error": str(e), "languages": [] } async def getAvailableVoices(self, languageCode: Optional[str] = None) -> Dict[str, Any]: """ Get available voices from Google Cloud Text-to-Speech. Args: languageCode: Optional language code to filter voices Returns: Dict containing success status and list of available voices """ try: logger.info(f"🎤 Getting available voices, language filter: {languageCode}") connector = self._getGoogleSpeechConnector() result = await connector.getAvailableVoices(languageCode=languageCode) if result["success"]: logger.info(f"✅ Found {len(result['voices'])} voices for language filter: {languageCode}") else: logger.warning(f"⚠️ Failed to get voices: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Error getting available voices: {e}") return { "success": False, "error": str(e), "voices": [] } # Audio Validation def validateAudioFormat(self, audioContent: bytes) -> Dict[str, Any]: """ Validate audio format for Google Cloud Speech-to-Text. Args: audioContent: Raw audio data Returns: Dict containing validation results """ try: logger.debug(f"Validating audio format: {len(audioContent)} bytes") connector = self._getGoogleSpeechConnector() result = connector.validateAudioFormat(audioContent) if result["valid"]: logger.debug(f"✅ Audio validation successful: {result['format']}, {result['sample_rate']}Hz, {result['channels']}ch") else: logger.warning(f"⚠️ Audio validation failed: {result.get('error', 'Unknown error')}") return result except Exception as e: logger.error(f"❌ Audio validation error: {e}") return { "valid": False, "error": str(e) } # Health Check async def healthCheck(self) -> Dict[str, Any]: """ Perform health check for voice services. Returns: Dict containing health status and test results """ try: logger.info("🏥 Performing voice services health check") connector = self._getGoogleSpeechConnector() # Test with a simple translation testResult = await connector.translateText( text="Hello", sourceLanguage="en", targetLanguage="de" ) if testResult["success"]: return { "status": "healthy", "service": "Google Cloud Speech-to-Text & Translation", "test_translation": testResult["translated_text"] } else: return { "status": "unhealthy", "error": testResult.get("error", "Unknown error") } except Exception as e: logger.error(f"❌ Health check failed: {e}") return { "status": "unhealthy", "error": str(e) } def getVoiceInterface(currentUser: User = None, mandateId: Optional[str] = None) -> VoiceObjects: """ Factory function to get or create Voice interface instance. Args: currentUser: User object for context (optional) mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required. Returns: VoiceObjects instance """ effectiveMandateId = str(mandateId) if mandateId else None voiceInterface = VoiceObjects() if currentUser: voiceInterface.setUserContext(currentUser, mandateId=effectiveMandateId) return voiceInterface