gateway/modules/interfaces/interfaceVoiceObjects.py
2026-04-19 00:19:42 +02:00

493 lines
18 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface for Voice Services
Provides a generic interface layer between routes and voice connectors.
Handles voice operations including speech-to-text, text-to-speech, and translation.
"""
import asyncio
import logging
from typing import AsyncGenerator, Callable, Dict, Any, Optional, List
from modules.connectors.connectorVoiceGoogle import ConnectorGoogleSpeech
from modules.datamodels.datamodelUam import User
logger = logging.getLogger(__name__)
# Singleton factory for Voice instances
_instancesVoice = {}
class VoiceObjects:
"""
Interface for Voice Services.
Provides a generic interface layer between routes and voice connectors.
"""
def __init__(self):
"""Initialize the Voice Interface."""
self.currentUser: Optional[User] = None
self.userId: Optional[str] = None
self._google_speech_connector: Optional[ConnectorGoogleSpeech] = None
self.billingCallback: Optional[Callable[[Dict[str, Any]], None]] = None
def setUserContext(self, currentUser: User, mandateId: Optional[str] = None):
"""Set the user context for the interface.
Args:
currentUser: The authenticated user
mandateId: The mandate ID from RequestContext (X-Mandate-Id header)
"""
if not currentUser:
logger.info("Initializing voice interface without user context")
return
self.currentUser = currentUser
self.userId = currentUser.id
# Use mandateId from parameter (Request-Context), not from user object
self.mandateId = mandateId
if not self.userId:
raise ValueError("Invalid user context: id is required")
logger.debug(f"Voice interface user context set: userId={self.userId}, mandateId={self.mandateId}")
def _getGoogleSpeechConnector(self) -> ConnectorGoogleSpeech:
"""Get or create Google Cloud Speech connector instance."""
if self._google_speech_connector is None:
try:
self._google_speech_connector = ConnectorGoogleSpeech()
logger.info("✅ Google Cloud Speech connector initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize Google Cloud Speech connector: {e}")
raise
return self._google_speech_connector
# Speech-to-Text Operations
async def speechToText(self, audioContent: bytes, language: str = "de-DE",
sampleRate: int = None, channels: int = None,
skipFallbacks: bool = False,
phraseHints: list = None,
alternativeLanguages: list = None) -> Dict[str, Any]:
"""
Convert speech to text using Google Cloud Speech-to-Text API.
Args:
audioContent: Raw audio data
language: Language code (e.g., 'de-DE', 'en-US')
sampleRate: Audio sample rate (auto-detected if None)
channels: Number of audio channels (auto-detected if None)
skipFallbacks: If True, skip fallback attempts (use when audio format is known)
phraseHints: Optional list of phrases to boost recognition (names, terms)
alternativeLanguages: Optional list of additional language codes for multi-language
Returns:
Dict containing transcribed text, confidence, and metadata
"""
try:
logger.info(f"Speech-to-text request: {len(audioContent)} bytes, language: {language}")
connector = self._getGoogleSpeechConnector()
result = await connector.speechToText(
audioContent=audioContent,
language=language,
sampleRate=sampleRate,
channels=channels,
skipFallbacks=skipFallbacks,
phraseHints=phraseHints,
alternativeLanguages=alternativeLanguages,
)
if result["success"]:
logger.info(f"✅ Speech-to-text successful: '{result['text']}' (confidence: {result['confidence']:.2f})")
else:
logger.warning(f"⚠️ Speech-to-text failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Speech-to-text error: {e}")
return {
"success": False,
"text": "",
"confidence": 0.0,
"error": str(e)
}
async def streamingSpeechToText(
self,
audioQueue: asyncio.Queue,
language: str = "de-DE",
phraseHints: Optional[list] = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Stream audio to Google Streaming STT and yield interim/final results.
Billing is recorded for each final result.
"""
connector = self._getGoogleSpeechConnector()
async for event in connector.streamingRecognize(audioQueue, language, phraseHints):
if event.get("isFinal") and self.billingCallback:
durationSec = event.get("audioDurationSec", 0)
priceCHF = connector.calculateSttCostCHF(durationSec)
if priceCHF > 0:
try:
self.billingCallback({
"operation": "stt-streaming",
"priceCHF": priceCHF,
"audioDurationSec": durationSec,
})
except Exception as e:
logger.warning(f"Voice STT billing callback failed: {e}")
yield event
# Translation Operations
async def detectLanguage(self, text: str) -> Dict[str, Any]:
"""
Detect the language of text using Google Cloud Translation API.
Args:
text: Text to detect language for
Returns:
Dict containing detected language code and confidence
"""
try:
logger.info(f"🔍 Language detection request: '{text[:100]}...'")
if not text.strip():
return {
"success": False,
"language": "",
"error": "Empty text provided"
}
connector = self._getGoogleSpeechConnector()
result = await connector.detectLanguage(text)
if result["success"]:
logger.info(f"✅ Language detected: {result['language']}")
else:
logger.warning(f"⚠️ Language detection failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Language detection error: {e}")
return {
"success": False,
"language": "",
"error": str(e)
}
async def translateText(self, text: str,
sourceLanguage: Optional[str] = None,
targetLanguage: str = "en") -> Dict[str, Any]:
"""
Translate text using Google Cloud Translation API.
Args:
text: Text to translate
sourceLanguage: Source language ISO code (e.g. 'de', 'en'); pass None
or 'auto' to let Google auto-detect.
targetLanguage: Target language ISO code (e.g. 'en', 'de')
Returns:
Dict containing translated text and metadata
"""
try:
logger.info(
f"🌐 Translation request: '{text}' "
f"({sourceLanguage or 'auto'} -> {targetLanguage})"
)
if not text.strip():
return {
"success": False,
"translated_text": "",
"error": "Empty text provided"
}
connector = self._getGoogleSpeechConnector()
result = await connector.translateText(
text=text,
sourceLanguage=sourceLanguage,
targetLanguage=targetLanguage
)
if result["success"]:
logger.info(f"✅ Translation successful: '{result['translated_text']}'")
else:
logger.warning(f"⚠️ Translation failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Translation error: {e}")
return {
"success": False,
"translated_text": "",
"error": str(e)
}
# Combined Operations
async def speechToTranslatedText(self, audioContent: bytes,
fromLanguage: str = "de-DE",
toLanguage: str = "en") -> Dict[str, Any]:
"""
Complete pipeline: Speech-to-Text + Translation.
Args:
audioContent: Raw audio data
fromLanguage: Source language for speech recognition
toLanguage: Target language for translation
Returns:
Dict containing original text, translated text, and metadata
"""
try:
logger.info(f"🔄 Speech-to-translation pipeline: {fromLanguage} -> {toLanguage}")
connector = self._getGoogleSpeechConnector()
result = await connector.speechToTranslatedText(
audioContent=audioContent,
fromLanguage=fromLanguage,
toLanguage=toLanguage
)
if result["success"]:
logger.info(f"✅ Complete pipeline successful:")
logger.info(f" Original: '{result['original_text']}'")
logger.info(f" Translated: '{result['translated_text']}'")
else:
logger.warning(f"⚠️ Speech-to-translation pipeline failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Speech-to-translation pipeline error: {e}")
return {
"success": False,
"original_text": "",
"translated_text": "",
"error": str(e)
}
# Text-to-Speech Operations
async def textToSpeech(self, text: str, languageCode: str = "de-DE",
voiceName: str = None) -> Dict[str, Any]:
"""
Convert text to speech using Google Cloud Text-to-Speech.
Args:
text: Text to convert to speech
languageCode: Language code (e.g., 'de-DE', 'en-US')
voiceName: Specific voice name (optional)
Returns:
Dict with success status and audio data
"""
try:
logger.info(f"🔊 Text-to-Speech request: '{text[:50]}...' in {languageCode}")
if not text.strip():
return {
"success": False,
"error": "Empty text provided for text-to-speech"
}
connector = self._getGoogleSpeechConnector()
result = await connector.textToSpeech(
text=text,
languageCode=languageCode,
voiceName=voiceName
)
if result["success"]:
logger.info(f"✅ Text-to-Speech successful: {len(result['audio_content'])} bytes")
if self.billingCallback:
connector = self._getGoogleSpeechConnector()
priceCHF = connector.calculateTtsCostCHF(len(text))
if priceCHF > 0:
try:
self.billingCallback({
"operation": "tts-wavenet",
"priceCHF": priceCHF,
"characterCount": len(text),
})
except Exception as e:
logger.warning(f"Voice TTS billing callback failed: {e}")
return {
"success": True,
"audioContent": result["audio_content"],
"audioFormat": result.get("audio_format", "mp3"),
"languageCode": result.get("language_code", languageCode),
"voiceName": result.get("voice_name", voiceName),
}
else:
logger.warning(f"⚠️ Text-to-Speech failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Text-to-Speech error: {e}")
return {
"success": False,
"error": str(e)
}
# Language and Voice Information
async def getAvailableLanguages(self) -> Dict[str, Any]:
"""
Get available languages from Google Cloud Text-to-Speech.
Returns:
Dict containing success status and list of available languages
"""
try:
logger.info("🌐 Getting available languages from Google Cloud TTS")
connector = self._getGoogleSpeechConnector()
result = await connector.getAvailableLanguages()
if result["success"]:
logger.info(f"✅ Found {len(result['languages'])} available languages")
else:
logger.warning(f"⚠️ Failed to get languages: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Error getting available languages: {e}")
return {
"success": False,
"error": str(e),
"languages": []
}
async def getAvailableVoices(self, languageCode: Optional[str] = None) -> Dict[str, Any]:
"""
Get available voices from Google Cloud Text-to-Speech.
Args:
languageCode: Optional language code to filter voices
Returns:
Dict containing success status and list of available voices
"""
try:
logger.info(f"🎤 Getting available voices, language filter: {languageCode}")
connector = self._getGoogleSpeechConnector()
result = await connector.getAvailableVoices(languageCode=languageCode)
if result["success"]:
logger.info(f"✅ Found {len(result['voices'])} voices for language filter: {languageCode}")
else:
logger.warning(f"⚠️ Failed to get voices: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Error getting available voices: {e}")
return {
"success": False,
"error": str(e),
"voices": []
}
# Audio Validation
def validateAudioFormat(self, audioContent: bytes) -> Dict[str, Any]:
"""
Validate audio format for Google Cloud Speech-to-Text.
Args:
audioContent: Raw audio data
Returns:
Dict containing validation results
"""
try:
logger.debug(f"Validating audio format: {len(audioContent)} bytes")
connector = self._getGoogleSpeechConnector()
result = connector.validateAudioFormat(audioContent)
if result["valid"]:
logger.debug(f"✅ Audio validation successful: {result['format']}, {result['sample_rate']}Hz, {result['channels']}ch")
else:
logger.warning(f"⚠️ Audio validation failed: {result.get('error', 'Unknown error')}")
return result
except Exception as e:
logger.error(f"❌ Audio validation error: {e}")
return {
"valid": False,
"error": str(e)
}
# Health Check
async def healthCheck(self) -> Dict[str, Any]:
"""
Perform health check for voice services.
Returns:
Dict containing health status and test results
"""
try:
logger.info("🏥 Performing voice services health check")
connector = self._getGoogleSpeechConnector()
# Test with a simple translation
testResult = await connector.translateText(
text="Hello",
sourceLanguage="en",
targetLanguage="de"
)
if testResult["success"]:
return {
"status": "healthy",
"service": "Google Cloud Speech-to-Text & Translation",
"test_translation": testResult["translated_text"]
}
else:
return {
"status": "unhealthy",
"error": testResult.get("error", "Unknown error")
}
except Exception as e:
logger.error(f"❌ Health check failed: {e}")
return {
"status": "unhealthy",
"error": str(e)
}
def getVoiceInterface(currentUser: User = None, mandateId: Optional[str] = None) -> VoiceObjects:
"""
Factory function to get or create Voice interface instance.
Args:
currentUser: User object for context (optional)
mandateId: The mandate ID from RequestContext (X-Mandate-Id header). Required.
Returns:
VoiceObjects instance
"""
effectiveMandateId = str(mandateId) if mandateId else None
voiceInterface = VoiceObjects()
if currentUser:
voiceInterface.setUserContext(currentUser, mandateId=effectiveMandateId)
return voiceInterface