gateway/modules/features/teamsbot/service.py
patrick-motsch 367edd83e2 fix: add missing await on speechToText and textToSpeech async calls
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-13 17:46:02 +01:00

566 lines
23 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot Service - Pipeline Orchestrator.
Manages the audio processing pipeline: STT -> Context Buffer -> SPEECH_TEAMS -> TTS -> Bridge.
"""
import logging
import json
import asyncio
import time
import base64
from typing import Optional, Dict, Any, List
from fastapi import WebSocket
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
from modules.shared.timeUtils import getUtcTimestamp
from .datamodelTeamsbot import (
TeamsbotSessionStatus,
TeamsbotTranscript,
TeamsbotBotResponse,
TeamsbotResponseType,
TeamsbotConfig,
TeamsbotResponseMode,
SpeechTeamsResponse,
)
from .bridgeConnector import BridgeConnector
logger = logging.getLogger(__name__)
# =========================================================================
# Minimal Service Context (for AI billing in bridge callbacks)
# =========================================================================
class _ServiceContext:
"""Minimal context providing user/mandate info for AiService billing.
Used by bridge callbacks where a full Services instance is not available."""
def __init__(self, user, mandateId, featureInstanceId=None):
self.user = user
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.featureCode = "teamsbot"
# =========================================================================
# Session Event Queues (for SSE streaming to frontend)
# =========================================================================
_sessionEvents: Dict[str, asyncio.Queue] = {}
async def _emitSessionEvent(sessionId: str, eventType: str, data: Any):
"""Emit an event to the session's SSE stream."""
eventQueue = _sessionEvents.get(sessionId)
if eventQueue:
await eventQueue.put({"type": eventType, "data": data, "timestamp": getUtcTimestamp()})
class TeamsbotService:
"""
Pipeline Orchestrator for Teams Bot sessions.
Coordinates VoiceObjects (STT/TTS), AiService (SPEECH_TEAMS), and Bridge communication.
"""
def __init__(self, currentUser: User, mandateId: str, instanceId: str, config: TeamsbotConfig):
self.currentUser = currentUser
self.mandateId = mandateId
self.instanceId = instanceId
self.config = config
self.bridgeConnector = BridgeConnector(config._getEffectiveBridgeUrl())
# State
self._lastAiCallTime: float = 0.0
self._contextBuffer: List[Dict[str, Any]] = []
# =========================================================================
# Session Lifecycle
# =========================================================================
async def joinMeeting(
self,
sessionId: str,
meetingLink: str,
connectionId: Optional[str] = None,
gatewayBaseUrl: str = "",
):
"""Send join command to the .NET Media Bridge.
Args:
gatewayBaseUrl: Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net).
Passed to the bridge so it can build full callback/WS URLs per-session.
"""
from . import interfaceFeatureTeamsbot as interfaceDb
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
# Initialize SSE event queue
_sessionEvents[sessionId] = asyncio.Queue()
try:
# Update status to JOINING
interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.JOINING.value})
await _emitSessionEvent(sessionId, "statusChange", {"status": "joining"})
# Send join command to bridge
session = interface.getSession(sessionId)
if not session:
raise ValueError(f"Session {sessionId} not found")
result = await self.bridgeConnector.joinMeeting(
meetingLink=meetingLink,
botName=session.get("botName", self.config.botName),
backgroundImageUrl=session.get("backgroundImageUrl"),
sessionId=sessionId,
gatewayCallbackUrl=f"/api/teamsbot/{self.instanceId}/bridge/status",
gatewayWsUrl=f"/api/teamsbot/{self.instanceId}/bridge/audio/{sessionId}",
gatewayBaseUrl=gatewayBaseUrl,
)
if result.get("success"):
bridgeSessionId = result.get("bridgeSessionId")
interface.updateSession(sessionId, {
"bridgeSessionId": bridgeSessionId,
"status": TeamsbotSessionStatus.ACTIVE.value,
"startedAt": getUtcTimestamp(),
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "active"})
logger.info(f"Bot joined meeting for session {sessionId}")
else:
errorMsg = result.get("error", "Unknown error joining meeting")
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ERROR.value,
"errorMessage": errorMsg,
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": errorMsg})
logger.error(f"Failed to join meeting for session {sessionId}: {errorMsg}")
except Exception as e:
logger.error(f"Error joining meeting for session {sessionId}: {e}")
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ERROR.value,
"errorMessage": str(e),
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": str(e)})
async def leaveMeeting(self, sessionId: str):
"""Send leave command to the .NET Media Bridge."""
from . import interfaceFeatureTeamsbot as interfaceDb
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
try:
interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.LEAVING.value})
await _emitSessionEvent(sessionId, "statusChange", {"status": "leaving"})
await self.bridgeConnector.leaveMeeting(sessionId)
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ENDED.value,
"endedAt": getUtcTimestamp(),
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "ended"})
# Generate meeting summary in background
asyncio.create_task(self._generateMeetingSummary(sessionId))
logger.info(f"Bot left meeting for session {sessionId}")
except Exception as e:
logger.error(f"Error leaving meeting for session {sessionId}: {e}")
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ERROR.value,
"errorMessage": str(e),
"endedAt": getUtcTimestamp(),
})
# Cleanup event queue
_sessionEvents.pop(sessionId, None)
# =========================================================================
# Audio Processing Pipeline
# =========================================================================
async def handleAudioStream(self, websocket: WebSocket, sessionId: str):
"""
Main audio processing loop.
Receives PCM audio from bridge via WebSocket, processes through STT -> AI -> TTS pipeline.
"""
from . import interfaceFeatureTeamsbot as interfaceDb
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
voiceInterface = getVoiceInterface(self.currentUser, self.mandateId)
audioBuffer = bytearray()
bufferDurationMs = 0
targetBufferMs = 1500 # Buffer 1.5 seconds of audio before STT
# PCM16 at 16kHz mono = 32000 bytes/second
bytesPerSecond = 32000
bytesPerMs = bytesPerSecond / 1000
logger.info(f"Audio processing started for session {sessionId}")
try:
while True:
# Receive audio data from bridge
data = await websocket.receive()
if "bytes" in data:
audioChunk = data["bytes"]
elif "text" in data:
# JSON message (control messages)
message = json.loads(data["text"])
msgType = message.get("type")
if msgType == "audio_chunk":
audioChunk = base64.b64decode(message.get("data", ""))
elif msgType == "session_ended":
logger.info(f"Bridge signaled session ended: {sessionId}")
break
elif msgType == "ping":
await websocket.send_text(json.dumps({"type": "pong"}))
continue
else:
continue
else:
continue
# Accumulate audio in buffer
audioBuffer.extend(audioChunk)
bufferDurationMs = len(audioBuffer) / bytesPerMs
# Process when buffer has enough audio
if bufferDurationMs >= targetBufferMs:
await self._processAudioBuffer(
bytes(audioBuffer),
sessionId,
interface,
voiceInterface,
websocket,
)
audioBuffer.clear()
bufferDurationMs = 0
except Exception as e:
if "disconnect" not in str(e).lower():
logger.error(f"Audio stream error for session {sessionId}: {e}")
# Process remaining buffer
if len(audioBuffer) > 0:
await self._processAudioBuffer(
bytes(audioBuffer),
sessionId,
interface,
voiceInterface,
websocket,
)
logger.info(f"Audio processing ended for session {sessionId}")
async def _processAudioBuffer(
self,
audioBytes: bytes,
sessionId: str,
interface,
voiceInterface,
websocket: WebSocket,
):
"""Process a buffered audio chunk through the STT -> AI -> TTS pipeline."""
# Step 1: STT -- convert audio to text
try:
sttResult = await voiceInterface.speechToText(
audioContent=audioBytes,
language=self.config.language,
sampleRate=16000,
channels=1
)
except Exception as e:
logger.warning(f"STT failed for session {sessionId}: {e}")
return
transcriptText = sttResult.get("text", "").strip() if isinstance(sttResult, dict) else str(sttResult).strip()
if not transcriptText:
return
confidence = sttResult.get("confidence", 0.0) if isinstance(sttResult, dict) else 0.0
speaker = sttResult.get("speaker") if isinstance(sttResult, dict) else None
# Store transcript segment
transcriptData = TeamsbotTranscript(
sessionId=sessionId,
speaker=speaker,
text=transcriptText,
timestamp=str(getUtcTimestamp()),
confidence=confidence,
language=self.config.language,
isFinal=True,
).model_dump()
createdTranscript = interface.createTranscript(transcriptData)
# Update context buffer
self._contextBuffer.append({
"speaker": speaker or "Unknown",
"text": transcriptText,
"timestamp": getUtcTimestamp(),
})
# Keep only last N segments
maxSegments = self.config.contextWindowSegments
if len(self._contextBuffer) > maxSegments:
self._contextBuffer = self._contextBuffer[-maxSegments:]
# Emit SSE event for live transcript
await _emitSessionEvent(sessionId, "transcript", {
"id": createdTranscript.get("id"),
"speaker": speaker,
"text": transcriptText,
"confidence": confidence,
"timestamp": getUtcTimestamp(),
})
# Update session transcript count
session = interface.getSession(sessionId)
if session:
count = session.get("transcriptSegmentCount", 0) + 1
interface.updateSession(sessionId, {"transcriptSegmentCount": count})
# Step 2: Check if AI analysis should be triggered
if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY:
return
if not self._shouldTriggerAnalysis(transcriptText):
return
# Step 3: SPEECH_TEAMS AI analysis
await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript)
def _shouldTriggerAnalysis(self, transcriptText: str) -> bool:
"""
Decide whether to trigger AI analysis based on the latest transcript.
Triggers:
- Bot name mentioned (immediate)
- Periodic interval elapsed
- Cooldown respected
"""
now = time.time()
# Cooldown check
timeSinceLastCall = now - self._lastAiCallTime
if timeSinceLastCall < self.config.triggerCooldownSeconds:
return False
# Bot name mentioned -> immediate trigger
botNameLower = self.config.botName.lower()
if botNameLower in transcriptText.lower():
logger.debug(f"Trigger: Bot name '{self.config.botName}' detected in transcript")
return True
# Periodic trigger
if timeSinceLastCall >= self.config.triggerIntervalSeconds:
logger.debug(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed")
return True
return False
async def _analyzeAndRespond(
self,
sessionId: str,
interface,
voiceInterface,
websocket: WebSocket,
triggerTranscript: Dict[str, Any],
):
"""Run SPEECH_TEAMS AI analysis and respond if needed."""
self._lastAiCallTime = time.time()
# Build transcript context from buffer
contextLines = []
for segment in self._contextBuffer:
speaker = segment.get("speaker", "Unknown")
text = segment.get("text", "")
contextLines.append(f"[{speaker}]: {text}")
transcriptContext = f"BOT_NAME:{self.config.botName}\n" + "\n".join(contextLines)
# Call SPEECH_TEAMS
try:
from modules.services.serviceAi.mainServiceAi import AiService
# Create minimal service context for AI billing
serviceContext = _ServiceContext(self.currentUser, self.mandateId, self.instanceId)
aiService = AiService(serviceCenter=serviceContext)
await aiService.ensureAiObjectsInitialized()
request = AiCallRequest(
prompt=self.config.aiSystemPrompt,
context=transcriptContext,
options=AiCallOptions(
operationType=OperationTypeEnum.SPEECH_TEAMS,
priority=PriorityEnum.SPEED,
)
)
response = await aiService.callAi(request)
# Parse structured response
try:
speechResult = SpeechTeamsResponse.model_validate_json(response.content)
except Exception:
# Try to extract JSON from response content
try:
jsonStr = response.content
if "```json" in jsonStr:
jsonStr = jsonStr.split("```json")[1].split("```")[0]
elif "```" in jsonStr:
jsonStr = jsonStr.split("```")[1].split("```")[0]
speechResult = SpeechTeamsResponse.model_validate_json(jsonStr.strip())
except Exception as parseErr:
logger.warning(f"Failed to parse SPEECH_TEAMS response: {parseErr}")
speechResult = SpeechTeamsResponse(
shouldRespond=False,
reasoning=f"Parse error: {str(parseErr)[:100]}",
detectedIntent="none"
)
logger.info(
f"SPEECH_TEAMS result: shouldRespond={speechResult.shouldRespond}, "
f"intent={speechResult.detectedIntent}, "
f"reasoning={speechResult.reasoning[:80]}..."
)
# Emit analysis event (always, for debug/UI)
await _emitSessionEvent(sessionId, "analysis", {
"shouldRespond": speechResult.shouldRespond,
"detectedIntent": speechResult.detectedIntent,
"reasoning": speechResult.reasoning,
"modelName": response.modelName,
"processingTime": response.processingTime,
"priceCHF": response.priceCHF,
})
# Step 4: Respond if AI decided to
if speechResult.shouldRespond and speechResult.responseText:
if self.config.responseMode == TeamsbotResponseMode.MANUAL:
# In manual mode, suggest but don't send
await _emitSessionEvent(sessionId, "suggestedResponse", {
"responseText": speechResult.responseText,
"detectedIntent": speechResult.detectedIntent,
"reasoning": speechResult.reasoning,
})
return
# Auto mode: send voice + chat response
responseType = TeamsbotResponseType.BOTH
# 4a: TTS -> Audio to bridge
try:
ttsResult = await voiceInterface.textToSpeech(
text=speechResult.responseText,
languageCode=self.config.language,
voiceName=self.config.voiceId
)
if ttsResult and isinstance(ttsResult, dict):
audioContent = ttsResult.get("audioContent")
if audioContent:
# Send TTS audio to bridge
await websocket.send_text(json.dumps({
"type": "tts_audio",
"data": base64.b64encode(audioContent if isinstance(audioContent, bytes) else audioContent.encode()).decode(),
"format": "mp3",
}))
except Exception as ttsErr:
logger.warning(f"TTS failed for session {sessionId}: {ttsErr}")
responseType = TeamsbotResponseType.CHAT # Fallback to chat only
# 4b: Store bot response
botResponseData = TeamsbotBotResponse(
sessionId=sessionId,
responseText=speechResult.responseText,
responseType=responseType,
detectedIntent=speechResult.detectedIntent,
reasoning=speechResult.reasoning,
triggeredByTranscriptId=triggerTranscript.get("id"),
modelName=response.modelName,
processingTime=response.processingTime,
priceCHF=response.priceCHF,
timestamp=str(getUtcTimestamp()),
).model_dump()
createdResponse = interface.createBotResponse(botResponseData)
# 4c: Emit SSE event
await _emitSessionEvent(sessionId, "botResponse", {
"id": createdResponse.get("id"),
"responseText": speechResult.responseText,
"responseType": responseType.value,
"detectedIntent": speechResult.detectedIntent,
"reasoning": speechResult.reasoning,
"modelName": response.modelName,
"processingTime": response.processingTime,
"priceCHF": response.priceCHF,
})
# Update session response count
session = interface.getSession(sessionId)
if session:
count = session.get("botResponseCount", 0) + 1
interface.updateSession(sessionId, {"botResponseCount": count})
logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}")
except Exception as e:
logger.error(f"SPEECH_TEAMS analysis failed for session {sessionId}: {e}")
await _emitSessionEvent(sessionId, "error", {"message": f"AI analysis failed: {str(e)}"})
# =========================================================================
# Meeting Summary
# =========================================================================
async def _generateMeetingSummary(self, sessionId: str):
"""Generate an AI summary of the meeting after it ends."""
try:
from . import interfaceFeatureTeamsbot as interfaceDb
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
transcripts = interface.getTranscripts(sessionId)
if not transcripts or len(transcripts) < 5:
return # Not enough content for a summary
# Build full transcript
fullTranscript = "\n".join(
f"[{t.get('speaker', 'Unknown')}]: {t.get('text', '')}"
for t in transcripts
)
from modules.services.serviceAi.mainServiceAi import AiService
serviceContext = _ServiceContext(self.currentUser, self.mandateId, self.instanceId)
aiService = AiService(serviceCenter=serviceContext)
await aiService.ensureAiObjectsInitialized()
request = AiCallRequest(
prompt="Erstelle eine kurze Zusammenfassung dieses Meeting-Transkripts. Nenne die wichtigsten Punkte, Entscheidungen und offene Aktionspunkte.",
context=fullTranscript,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
)
)
response = await aiService.callAi(request)
if response and response.errorCount == 0:
interface.updateSession(sessionId, {"summary": response.content})
logger.info(f"Meeting summary generated for session {sessionId}")
except Exception as e:
logger.error(f"Failed to generate meeting summary for session {sessionId}: {e}")