549 lines
22 KiB
Python
549 lines
22 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
"""
|
|
Teamsbot Service - Pipeline Orchestrator.
|
|
Manages the audio processing pipeline: STT -> Context Buffer -> SPEECH_TEAMS -> TTS -> Bridge.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import asyncio
|
|
import time
|
|
import base64
|
|
from typing import Optional, Dict, Any, List
|
|
|
|
from fastapi import WebSocket
|
|
|
|
from modules.datamodels.datamodelUam import User
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
|
|
from modules.shared.timeUtils import getUtcTimestamp
|
|
|
|
from .datamodelTeamsbot import (
|
|
TeamsbotSessionStatus,
|
|
TeamsbotTranscript,
|
|
TeamsbotBotResponse,
|
|
TeamsbotResponseType,
|
|
TeamsbotConfig,
|
|
TeamsbotResponseMode,
|
|
SpeechTeamsResponse,
|
|
)
|
|
from .bridgeConnector import BridgeConnector
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# =========================================================================
|
|
# Session Event Queues (for SSE streaming to frontend)
|
|
# =========================================================================
|
|
_sessionEvents: Dict[str, asyncio.Queue] = {}
|
|
|
|
|
|
async def _emitSessionEvent(sessionId: str, eventType: str, data: Any):
|
|
"""Emit an event to the session's SSE stream."""
|
|
eventQueue = _sessionEvents.get(sessionId)
|
|
if eventQueue:
|
|
await eventQueue.put({"type": eventType, "data": data, "timestamp": getUtcTimestamp()})
|
|
|
|
|
|
class TeamsbotService:
|
|
"""
|
|
Pipeline Orchestrator for Teams Bot sessions.
|
|
Coordinates VoiceObjects (STT/TTS), AiService (SPEECH_TEAMS), and Bridge communication.
|
|
"""
|
|
|
|
def __init__(self, currentUser: User, mandateId: str, instanceId: str, config: TeamsbotConfig):
|
|
self.currentUser = currentUser
|
|
self.mandateId = mandateId
|
|
self.instanceId = instanceId
|
|
self.config = config
|
|
self.bridgeConnector = BridgeConnector(config._getEffectiveBridgeUrl())
|
|
|
|
# State
|
|
self._lastAiCallTime: float = 0.0
|
|
self._contextBuffer: List[Dict[str, Any]] = []
|
|
|
|
# =========================================================================
|
|
# Session Lifecycle
|
|
# =========================================================================
|
|
|
|
async def joinMeeting(
|
|
self,
|
|
sessionId: str,
|
|
meetingLink: str,
|
|
connectionId: Optional[str] = None,
|
|
gatewayBaseUrl: str = "",
|
|
):
|
|
"""Send join command to the .NET Media Bridge.
|
|
|
|
Args:
|
|
gatewayBaseUrl: Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net).
|
|
Passed to the bridge so it can build full callback/WS URLs per-session.
|
|
"""
|
|
from . import interfaceFeatureTeamsbot as interfaceDb
|
|
|
|
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
|
|
|
|
# Initialize SSE event queue
|
|
_sessionEvents[sessionId] = asyncio.Queue()
|
|
|
|
try:
|
|
# Update status to JOINING
|
|
interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.JOINING.value})
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": "joining"})
|
|
|
|
# Send join command to bridge
|
|
session = interface.getSession(sessionId)
|
|
if not session:
|
|
raise ValueError(f"Session {sessionId} not found")
|
|
|
|
result = await self.bridgeConnector.joinMeeting(
|
|
meetingLink=meetingLink,
|
|
botName=session.get("botName", self.config.botName),
|
|
backgroundImageUrl=session.get("backgroundImageUrl"),
|
|
sessionId=sessionId,
|
|
gatewayCallbackUrl=f"/api/teamsbot/{self.instanceId}/bridge/status",
|
|
gatewayWsUrl=f"/api/teamsbot/{self.instanceId}/bridge/audio/{sessionId}",
|
|
gatewayBaseUrl=gatewayBaseUrl,
|
|
)
|
|
|
|
if result.get("success"):
|
|
bridgeSessionId = result.get("bridgeSessionId")
|
|
interface.updateSession(sessionId, {
|
|
"bridgeSessionId": bridgeSessionId,
|
|
"status": TeamsbotSessionStatus.ACTIVE.value,
|
|
"startedAt": getUtcTimestamp(),
|
|
})
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": "active"})
|
|
logger.info(f"Bot joined meeting for session {sessionId}")
|
|
else:
|
|
errorMsg = result.get("error", "Unknown error joining meeting")
|
|
interface.updateSession(sessionId, {
|
|
"status": TeamsbotSessionStatus.ERROR.value,
|
|
"errorMessage": errorMsg,
|
|
})
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": errorMsg})
|
|
logger.error(f"Failed to join meeting for session {sessionId}: {errorMsg}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error joining meeting for session {sessionId}: {e}")
|
|
interface.updateSession(sessionId, {
|
|
"status": TeamsbotSessionStatus.ERROR.value,
|
|
"errorMessage": str(e),
|
|
})
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": str(e)})
|
|
|
|
async def leaveMeeting(self, sessionId: str):
|
|
"""Send leave command to the .NET Media Bridge."""
|
|
from . import interfaceFeatureTeamsbot as interfaceDb
|
|
|
|
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
|
|
|
|
try:
|
|
interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.LEAVING.value})
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": "leaving"})
|
|
|
|
await self.bridgeConnector.leaveMeeting(sessionId)
|
|
|
|
interface.updateSession(sessionId, {
|
|
"status": TeamsbotSessionStatus.ENDED.value,
|
|
"endedAt": getUtcTimestamp(),
|
|
})
|
|
await _emitSessionEvent(sessionId, "statusChange", {"status": "ended"})
|
|
|
|
# Generate meeting summary in background
|
|
asyncio.create_task(self._generateMeetingSummary(sessionId))
|
|
|
|
logger.info(f"Bot left meeting for session {sessionId}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error leaving meeting for session {sessionId}: {e}")
|
|
interface.updateSession(sessionId, {
|
|
"status": TeamsbotSessionStatus.ERROR.value,
|
|
"errorMessage": str(e),
|
|
"endedAt": getUtcTimestamp(),
|
|
})
|
|
|
|
# Cleanup event queue
|
|
_sessionEvents.pop(sessionId, None)
|
|
|
|
# =========================================================================
|
|
# Audio Processing Pipeline
|
|
# =========================================================================
|
|
|
|
async def handleAudioStream(self, websocket: WebSocket, sessionId: str):
|
|
"""
|
|
Main audio processing loop.
|
|
Receives PCM audio from bridge via WebSocket, processes through STT -> AI -> TTS pipeline.
|
|
"""
|
|
from . import interfaceFeatureTeamsbot as interfaceDb
|
|
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
|
|
|
|
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
|
|
voiceInterface = getVoiceInterface(self.currentUser, self.mandateId)
|
|
|
|
audioBuffer = bytearray()
|
|
bufferDurationMs = 0
|
|
targetBufferMs = 1500 # Buffer 1.5 seconds of audio before STT
|
|
|
|
# PCM16 at 16kHz mono = 32000 bytes/second
|
|
bytesPerSecond = 32000
|
|
bytesPerMs = bytesPerSecond / 1000
|
|
|
|
logger.info(f"Audio processing started for session {sessionId}")
|
|
|
|
try:
|
|
while True:
|
|
# Receive audio data from bridge
|
|
data = await websocket.receive()
|
|
|
|
if "bytes" in data:
|
|
audioChunk = data["bytes"]
|
|
elif "text" in data:
|
|
# JSON message (control messages)
|
|
message = json.loads(data["text"])
|
|
msgType = message.get("type")
|
|
|
|
if msgType == "audio_chunk":
|
|
audioChunk = base64.b64decode(message.get("data", ""))
|
|
elif msgType == "session_ended":
|
|
logger.info(f"Bridge signaled session ended: {sessionId}")
|
|
break
|
|
elif msgType == "ping":
|
|
await websocket.send_text(json.dumps({"type": "pong"}))
|
|
continue
|
|
else:
|
|
continue
|
|
else:
|
|
continue
|
|
|
|
# Accumulate audio in buffer
|
|
audioBuffer.extend(audioChunk)
|
|
bufferDurationMs = len(audioBuffer) / bytesPerMs
|
|
|
|
# Process when buffer has enough audio
|
|
if bufferDurationMs >= targetBufferMs:
|
|
await self._processAudioBuffer(
|
|
bytes(audioBuffer),
|
|
sessionId,
|
|
interface,
|
|
voiceInterface,
|
|
websocket,
|
|
)
|
|
audioBuffer.clear()
|
|
bufferDurationMs = 0
|
|
|
|
except Exception as e:
|
|
if "disconnect" not in str(e).lower():
|
|
logger.error(f"Audio stream error for session {sessionId}: {e}")
|
|
|
|
# Process remaining buffer
|
|
if len(audioBuffer) > 0:
|
|
await self._processAudioBuffer(
|
|
bytes(audioBuffer),
|
|
sessionId,
|
|
interface,
|
|
voiceInterface,
|
|
websocket,
|
|
)
|
|
|
|
logger.info(f"Audio processing ended for session {sessionId}")
|
|
|
|
async def _processAudioBuffer(
|
|
self,
|
|
audioBytes: bytes,
|
|
sessionId: str,
|
|
interface,
|
|
voiceInterface,
|
|
websocket: WebSocket,
|
|
):
|
|
"""Process a buffered audio chunk through the STT -> AI -> TTS pipeline."""
|
|
|
|
# Step 1: STT -- convert audio to text
|
|
try:
|
|
sttResult = voiceInterface.speechToText(
|
|
audioContent=audioBytes,
|
|
language=self.config.language,
|
|
sampleRate=16000,
|
|
channels=1
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"STT failed for session {sessionId}: {e}")
|
|
return
|
|
|
|
transcriptText = sttResult.get("text", "").strip() if isinstance(sttResult, dict) else str(sttResult).strip()
|
|
if not transcriptText:
|
|
return
|
|
|
|
confidence = sttResult.get("confidence", 0.0) if isinstance(sttResult, dict) else 0.0
|
|
speaker = sttResult.get("speaker") if isinstance(sttResult, dict) else None
|
|
|
|
# Store transcript segment
|
|
transcriptData = TeamsbotTranscript(
|
|
sessionId=sessionId,
|
|
speaker=speaker,
|
|
text=transcriptText,
|
|
timestamp=str(getUtcTimestamp()),
|
|
confidence=confidence,
|
|
language=self.config.language,
|
|
isFinal=True,
|
|
).model_dump()
|
|
|
|
createdTranscript = interface.createTranscript(transcriptData)
|
|
|
|
# Update context buffer
|
|
self._contextBuffer.append({
|
|
"speaker": speaker or "Unknown",
|
|
"text": transcriptText,
|
|
"timestamp": getUtcTimestamp(),
|
|
})
|
|
# Keep only last N segments
|
|
maxSegments = self.config.contextWindowSegments
|
|
if len(self._contextBuffer) > maxSegments:
|
|
self._contextBuffer = self._contextBuffer[-maxSegments:]
|
|
|
|
# Emit SSE event for live transcript
|
|
await _emitSessionEvent(sessionId, "transcript", {
|
|
"id": createdTranscript.get("id"),
|
|
"speaker": speaker,
|
|
"text": transcriptText,
|
|
"confidence": confidence,
|
|
"timestamp": getUtcTimestamp(),
|
|
})
|
|
|
|
# Update session transcript count
|
|
session = interface.getSession(sessionId)
|
|
if session:
|
|
count = session.get("transcriptSegmentCount", 0) + 1
|
|
interface.updateSession(sessionId, {"transcriptSegmentCount": count})
|
|
|
|
# Step 2: Check if AI analysis should be triggered
|
|
if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY:
|
|
return
|
|
|
|
if not self._shouldTriggerAnalysis(transcriptText):
|
|
return
|
|
|
|
# Step 3: SPEECH_TEAMS AI analysis
|
|
await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript)
|
|
|
|
def _shouldTriggerAnalysis(self, transcriptText: str) -> bool:
|
|
"""
|
|
Decide whether to trigger AI analysis based on the latest transcript.
|
|
Triggers:
|
|
- Bot name mentioned (immediate)
|
|
- Periodic interval elapsed
|
|
- Cooldown respected
|
|
"""
|
|
now = time.time()
|
|
|
|
# Cooldown check
|
|
timeSinceLastCall = now - self._lastAiCallTime
|
|
if timeSinceLastCall < self.config.triggerCooldownSeconds:
|
|
return False
|
|
|
|
# Bot name mentioned -> immediate trigger
|
|
botNameLower = self.config.botName.lower()
|
|
if botNameLower in transcriptText.lower():
|
|
logger.debug(f"Trigger: Bot name '{self.config.botName}' detected in transcript")
|
|
return True
|
|
|
|
# Periodic trigger
|
|
if timeSinceLastCall >= self.config.triggerIntervalSeconds:
|
|
logger.debug(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed")
|
|
return True
|
|
|
|
return False
|
|
|
|
async def _analyzeAndRespond(
|
|
self,
|
|
sessionId: str,
|
|
interface,
|
|
voiceInterface,
|
|
websocket: WebSocket,
|
|
triggerTranscript: Dict[str, Any],
|
|
):
|
|
"""Run SPEECH_TEAMS AI analysis and respond if needed."""
|
|
self._lastAiCallTime = time.time()
|
|
|
|
# Build transcript context from buffer
|
|
contextLines = []
|
|
for segment in self._contextBuffer:
|
|
speaker = segment.get("speaker", "Unknown")
|
|
text = segment.get("text", "")
|
|
contextLines.append(f"[{speaker}]: {text}")
|
|
|
|
transcriptContext = f"BOT_NAME:{self.config.botName}\n" + "\n".join(contextLines)
|
|
|
|
# Call SPEECH_TEAMS
|
|
try:
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
|
|
# Create AiService with service center context
|
|
# Note: In production, serviceCenter should be passed properly
|
|
aiService = AiService(serviceCenter=None)
|
|
await aiService.ensureAiObjectsInitialized()
|
|
|
|
request = AiCallRequest(
|
|
prompt=self.config.aiSystemPrompt,
|
|
context=transcriptContext,
|
|
options=AiCallOptions(
|
|
operationType=OperationTypeEnum.SPEECH_TEAMS,
|
|
priority=PriorityEnum.SPEED,
|
|
)
|
|
)
|
|
|
|
response = await aiService.callAi(request)
|
|
|
|
# Parse structured response
|
|
try:
|
|
speechResult = SpeechTeamsResponse.model_validate_json(response.content)
|
|
except Exception:
|
|
# Try to extract JSON from response content
|
|
try:
|
|
jsonStr = response.content
|
|
if "```json" in jsonStr:
|
|
jsonStr = jsonStr.split("```json")[1].split("```")[0]
|
|
elif "```" in jsonStr:
|
|
jsonStr = jsonStr.split("```")[1].split("```")[0]
|
|
speechResult = SpeechTeamsResponse.model_validate_json(jsonStr.strip())
|
|
except Exception as parseErr:
|
|
logger.warning(f"Failed to parse SPEECH_TEAMS response: {parseErr}")
|
|
speechResult = SpeechTeamsResponse(
|
|
shouldRespond=False,
|
|
reasoning=f"Parse error: {str(parseErr)[:100]}",
|
|
detectedIntent="none"
|
|
)
|
|
|
|
logger.info(
|
|
f"SPEECH_TEAMS result: shouldRespond={speechResult.shouldRespond}, "
|
|
f"intent={speechResult.detectedIntent}, "
|
|
f"reasoning={speechResult.reasoning[:80]}..."
|
|
)
|
|
|
|
# Emit analysis event (always, for debug/UI)
|
|
await _emitSessionEvent(sessionId, "analysis", {
|
|
"shouldRespond": speechResult.shouldRespond,
|
|
"detectedIntent": speechResult.detectedIntent,
|
|
"reasoning": speechResult.reasoning,
|
|
"modelName": response.modelName,
|
|
"processingTime": response.processingTime,
|
|
"priceCHF": response.priceCHF,
|
|
})
|
|
|
|
# Step 4: Respond if AI decided to
|
|
if speechResult.shouldRespond and speechResult.responseText:
|
|
|
|
if self.config.responseMode == TeamsbotResponseMode.MANUAL:
|
|
# In manual mode, suggest but don't send
|
|
await _emitSessionEvent(sessionId, "suggestedResponse", {
|
|
"responseText": speechResult.responseText,
|
|
"detectedIntent": speechResult.detectedIntent,
|
|
"reasoning": speechResult.reasoning,
|
|
})
|
|
return
|
|
|
|
# Auto mode: send voice + chat response
|
|
responseType = TeamsbotResponseType.BOTH
|
|
|
|
# 4a: TTS -> Audio to bridge
|
|
try:
|
|
ttsResult = voiceInterface.textToSpeech(
|
|
text=speechResult.responseText,
|
|
languageCode=self.config.language,
|
|
voiceName=self.config.voiceId
|
|
)
|
|
|
|
if ttsResult and isinstance(ttsResult, dict):
|
|
audioContent = ttsResult.get("audioContent")
|
|
if audioContent:
|
|
# Send TTS audio to bridge
|
|
await websocket.send_text(json.dumps({
|
|
"type": "tts_audio",
|
|
"data": base64.b64encode(audioContent if isinstance(audioContent, bytes) else audioContent.encode()).decode(),
|
|
"format": "mp3",
|
|
}))
|
|
except Exception as ttsErr:
|
|
logger.warning(f"TTS failed for session {sessionId}: {ttsErr}")
|
|
responseType = TeamsbotResponseType.CHAT # Fallback to chat only
|
|
|
|
# 4b: Store bot response
|
|
botResponseData = TeamsbotBotResponse(
|
|
sessionId=sessionId,
|
|
responseText=speechResult.responseText,
|
|
responseType=responseType,
|
|
detectedIntent=speechResult.detectedIntent,
|
|
reasoning=speechResult.reasoning,
|
|
triggeredByTranscriptId=triggerTranscript.get("id"),
|
|
modelName=response.modelName,
|
|
processingTime=response.processingTime,
|
|
priceCHF=response.priceCHF,
|
|
timestamp=str(getUtcTimestamp()),
|
|
).model_dump()
|
|
|
|
createdResponse = interface.createBotResponse(botResponseData)
|
|
|
|
# 4c: Emit SSE event
|
|
await _emitSessionEvent(sessionId, "botResponse", {
|
|
"id": createdResponse.get("id"),
|
|
"responseText": speechResult.responseText,
|
|
"responseType": responseType.value,
|
|
"detectedIntent": speechResult.detectedIntent,
|
|
"reasoning": speechResult.reasoning,
|
|
"modelName": response.modelName,
|
|
"processingTime": response.processingTime,
|
|
"priceCHF": response.priceCHF,
|
|
})
|
|
|
|
# Update session response count
|
|
session = interface.getSession(sessionId)
|
|
if session:
|
|
count = session.get("botResponseCount", 0) + 1
|
|
interface.updateSession(sessionId, {"botResponseCount": count})
|
|
|
|
logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"SPEECH_TEAMS analysis failed for session {sessionId}: {e}")
|
|
await _emitSessionEvent(sessionId, "error", {"message": f"AI analysis failed: {str(e)}"})
|
|
|
|
# =========================================================================
|
|
# Meeting Summary
|
|
# =========================================================================
|
|
|
|
async def _generateMeetingSummary(self, sessionId: str):
|
|
"""Generate an AI summary of the meeting after it ends."""
|
|
try:
|
|
from . import interfaceFeatureTeamsbot as interfaceDb
|
|
|
|
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
|
|
transcripts = interface.getTranscripts(sessionId)
|
|
|
|
if not transcripts or len(transcripts) < 5:
|
|
return # Not enough content for a summary
|
|
|
|
# Build full transcript
|
|
fullTranscript = "\n".join(
|
|
f"[{t.get('speaker', 'Unknown')}]: {t.get('text', '')}"
|
|
for t in transcripts
|
|
)
|
|
|
|
from modules.services.serviceAi.mainServiceAi import AiService
|
|
|
|
aiService = AiService(serviceCenter=None)
|
|
await aiService.ensureAiObjectsInitialized()
|
|
|
|
request = AiCallRequest(
|
|
prompt="Erstelle eine kurze Zusammenfassung dieses Meeting-Transkripts. Nenne die wichtigsten Punkte, Entscheidungen und offene Aktionspunkte.",
|
|
context=fullTranscript,
|
|
options=AiCallOptions(
|
|
operationType=OperationTypeEnum.DATA_ANALYSE,
|
|
priority=PriorityEnum.BALANCED,
|
|
)
|
|
)
|
|
|
|
response = await aiService.callAi(request)
|
|
if response and response.errorCount == 0:
|
|
interface.updateSession(sessionId, {"summary": response.content})
|
|
logger.info(f"Meeting summary generated for session {sessionId}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate meeting summary for session {sessionId}: {e}")
|