gateway/modules/connectors/connectorGoogleSpeech.py
2025-09-14 20:05:46 +02:00

285 lines
11 KiB
Python

"""
Google Cloud Speech-to-Text and Translation Connector
Replaces Azure Speech Services with Google Cloud APIs
"""
import os
import io
import logging
import asyncio
from typing import Dict, Optional, Any
from google.cloud import speech
from google.cloud import translate_v2 as translate
logger = logging.getLogger(__name__)
class ConnectorGoogleSpeech:
"""
Google Cloud Speech-to-Text and Translation connector.
Handles audio processing, speech recognition, and translation.
"""
def __init__(self, credentials_path: Optional[str] = None):
"""
Initialize Google Cloud Speech and Translation clients.
Args:
credentials_path: Path to Google Cloud service account JSON file
"""
try:
# Set up authentication
if credentials_path:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = credentials_path
# Initialize clients
self.speech_client = speech.SpeechClient()
self.translate_client = translate.Client()
logger.info("✅ Google Cloud Speech and Translation clients initialized successfully")
except Exception as e:
logger.error(f"❌ Failed to initialize Google Cloud clients: {e}")
raise
async def speech_to_text(self, audio_content: bytes, language: str = "de-DE",
sample_rate: int = 16000, channels: int = 1) -> Dict:
"""
Convert speech to text using Google Cloud Speech-to-Text API.
Args:
audio_content: Raw audio data (PCM format)
language: Language code (e.g., 'de-DE', 'en-US')
sample_rate: Audio sample rate (default: 16000 Hz)
channels: Number of audio channels (default: 1)
Returns:
Dict containing transcribed text, confidence, and metadata
"""
try:
logger.info(f"🎤 Processing audio with Google Cloud Speech-to-Text")
logger.info(f"📊 Audio: {len(audio_content)} bytes, {sample_rate}Hz, {channels}ch")
# Configure audio settings
audio = speech.RecognitionAudio(content=audio_content)
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=sample_rate,
audio_channel_count=channels,
language_code=language,
enable_automatic_punctuation=True,
model="latest_long" # Use the latest model
)
# Perform speech recognition
logger.info("🔄 Sending audio to Google Cloud Speech-to-Text...")
response = self.speech_client.recognize(config=config, audio=audio)
# Process results
if response.results:
result = response.results[0]
if result.alternatives:
alternative = result.alternatives[0]
transcribed_text = alternative.transcript
confidence = alternative.confidence
logger.info(f"✅ Transcription successful: '{transcribed_text}' (confidence: {confidence:.2f})")
return {
"success": True,
"text": transcribed_text,
"confidence": confidence,
"language": language,
"raw_result": {
"transcript": transcribed_text,
"confidence": confidence,
"language_code": language
}
}
else:
logger.warning("⚠️ No transcription alternatives found")
return {
"success": False,
"text": "",
"confidence": 0.0,
"error": "No transcription alternatives found"
}
else:
logger.warning("⚠️ No recognition results from Google Cloud")
return {
"success": False,
"text": "",
"confidence": 0.0,
"error": "No recognition results"
}
except Exception as e:
logger.error(f"❌ Google Cloud Speech-to-Text error: {e}")
return {
"success": False,
"text": "",
"confidence": 0.0,
"error": str(e)
}
async def translate_text(self, text: str, target_language: str = "en",
source_language: str = "de") -> Dict:
"""
Translate text using Google Cloud Translation API.
Args:
text: Text to translate
target_language: Target language code (e.g., 'en', 'de')
source_language: Source language code (e.g., 'de', 'en')
Returns:
Dict containing translated text and metadata
"""
try:
if not text.strip():
logger.warning("⚠️ Empty text provided for translation")
return {
"success": False,
"translated_text": "",
"error": "Empty text provided"
}
logger.info(f"🌐 Translating: '{text}' ({source_language} -> {target_language})")
# Perform translation
result = self.translate_client.translate(
text,
source_language=source_language,
target_language=target_language
)
translated_text = result['translatedText']
detected_language = result.get('detectedSourceLanguage', source_language)
logger.info(f"✅ Translation successful: '{translated_text}'")
return {
"success": True,
"translated_text": translated_text,
"source_language": detected_language,
"target_language": target_language,
"original_text": text
}
except Exception as e:
logger.error(f"❌ Google Cloud Translation error: {e}")
return {
"success": False,
"translated_text": "",
"error": str(e)
}
async def speech_to_translated_text(self, audio_content: bytes,
from_language: str = "de-DE",
to_language: str = "en") -> Dict:
"""
Complete pipeline: Speech-to-Text + Translation.
Args:
audio_content: Raw audio data
from_language: Source language for speech recognition
to_language: Target language for translation
Returns:
Dict containing original text, translated text, and metadata
"""
try:
logger.info(f"🔄 Starting speech-to-translation pipeline: {from_language} -> {to_language}")
# Step 1: Speech-to-Text
speech_result = await self.speech_to_text(
audio_content=audio_content,
language=from_language
)
if not speech_result["success"]:
return {
"success": False,
"original_text": "",
"translated_text": "",
"error": f"Speech recognition failed: {speech_result.get('error', 'Unknown error')}"
}
original_text = speech_result["text"]
# Step 2: Translation
translation_result = await self.translate_text(
text=original_text,
source_language=from_language.split('-')[0], # Convert 'de-DE' to 'de'
target_language=to_language.split('-')[0] # Convert 'en-US' to 'en'
)
if not translation_result["success"]:
return {
"success": False,
"original_text": original_text,
"translated_text": "",
"error": f"Translation failed: {translation_result.get('error', 'Unknown error')}"
}
translated_text = translation_result["translated_text"]
logger.info(f"✅ Complete pipeline successful:")
logger.info(f" Original: '{original_text}'")
logger.info(f" Translated: '{translated_text}'")
return {
"success": True,
"original_text": original_text,
"translated_text": translated_text,
"confidence": speech_result["confidence"],
"source_language": from_language,
"target_language": to_language
}
except Exception as e:
logger.error(f"❌ Speech-to-translation pipeline error: {e}")
return {
"success": False,
"original_text": "",
"translated_text": "",
"error": str(e)
}
def validate_audio_format(self, audio_content: bytes) -> Dict:
"""
Validate audio format for Google Cloud Speech-to-Text.
Args:
audio_content: Raw audio data
Returns:
Dict containing validation results
"""
try:
# Google Cloud Speech-to-Text supports various formats
# We'll do basic validation
if len(audio_content) < 100:
return {
"valid": False,
"error": "Audio too short (less than 100 bytes)"
}
# Check if it looks like PCM audio (basic check)
if len(audio_content) % 2 != 0:
return {
"valid": False,
"error": "Audio data length is odd (not 16-bit PCM)"
}
return {
"valid": True,
"format": "pcm",
"size": len(audio_content),
"estimated_duration": len(audio_content) / (16000 * 2) # Rough estimate for 16kHz, 16-bit
}
except Exception as e:
return {
"valid": False,
"error": f"Validation error: {e}"
}