diff --git a/modules/features/teamsbot/service.py b/modules/features/teamsbot/service.py index e1ac6eda..c92756fc 100644 --- a/modules/features/teamsbot/service.py +++ b/modules/features/teamsbot/service.py @@ -83,15 +83,20 @@ class TeamsbotService: self._sessionContext: Optional[str] = None # User-provided background context self._contextSummary: Optional[str] = None # AI-generated summary of long context - # Differential transcript tracking: only write new text, update existing - # record when the same speaker continues speaking + # Differential transcript tracking self._lastTranscriptSpeaker: Optional[str] = None self._lastTranscriptText: Optional[str] = None self._lastTranscriptId: Optional[str] = None - self._recentSpeakerHints: List[Dict[str, Any]] = [] self._lastBotResponseText: Optional[str] = None self._lastBotResponseTs: float = 0.0 + # Speaker attribution: simple last-caption-speaker model + self._lastCaptionSpeaker: Optional[str] = None + self._unattributedTranscriptIds: List[str] = [] + + # Debounced name trigger: wait for speaker to finish before AI analysis + self._pendingNameTrigger: Optional[Dict[str, Any]] = None + # ========================================================================= # Session Lifecycle # ========================================================================= @@ -494,40 +499,43 @@ class TeamsbotService: except Exception as e: logger.error(f"[AudioChunk] STT error for session {sessionId}: {type(e).__name__}: {e}") - def _registerSpeakerHint(self, speaker: str, text: str): - """Store recent speaker hints from captions for audio-mode speaker attribution.""" + def _registerSpeakerHint(self, speaker: str, text: str, sessionId: str = ""): + """Track current speaker from captions for STT attribution. + When the first non-bot caption arrives, retroactively attributes + any STT segments that were created before a speaker was known.""" if not speaker: return normalizedSpeaker = speaker.strip() if not normalizedSpeaker or self._isBotSpeaker(normalizedSpeaker): return - self._recentSpeakerHints.append({ - "speaker": normalizedSpeaker, - "text": (text or "").strip(), - "timestamp": time.time(), - }) + prevSpeaker = self._lastCaptionSpeaker + self._lastCaptionSpeaker = normalizedSpeaker - # Keep only the latest 20 hints - if len(self._recentSpeakerHints) > 20: - self._recentSpeakerHints = self._recentSpeakerHints[-20:] + if prevSpeaker is None and self._unattributedTranscriptIds: + from . import interfaceFeatureTeamsbot as interfaceDb + interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId) + for tid in self._unattributedTranscriptIds: + interface.updateTranscript(tid, {"speaker": normalizedSpeaker}) + for seg in self._contextBuffer: + if seg.get("speaker") == "Unknown" and seg.get("source") == "audioCapture": + seg["speaker"] = normalizedSpeaker + if self._lastTranscriptSpeaker == "Unknown": + self._lastTranscriptSpeaker = normalizedSpeaker + logger.info( + f"Session {sessionId}: Retroactive speaker attribution: " + f"{len(self._unattributedTranscriptIds)} segments -> {normalizedSpeaker}" + ) + self._unattributedTranscriptIds.clear() + + if self._pendingNameTrigger: + self._pendingNameTrigger["lastActivity"] = time.time() def _resolveSpeakerForAudioCapture(self) -> Dict[str, Any]: - """Best-effort speaker name for audio chunks using recent caption hints.""" - if not self._recentSpeakerHints: - return {"speaker": "Meeting Audio", "speakerResolvedFromHint": False} - - nowTs = time.time() - # Prefer very recent hints to reduce wrong attribution - for hint in reversed(self._recentSpeakerHints): - hintAge = nowTs - hint.get("timestamp", 0) - if hintAge <= 15: - return { - "speaker": hint.get("speaker", "Meeting Audio"), - "speakerResolvedFromHint": True, - } - - return {"speaker": "Meeting Audio", "speakerResolvedFromHint": False} + """Speaker name for audio chunks — uses the last caption speaker.""" + if self._lastCaptionSpeaker: + return {"speaker": self._lastCaptionSpeaker, "speakerResolvedFromHint": True} + return {"speaker": "Unknown", "speakerResolvedFromHint": False} async def _processTranscript( self, @@ -560,31 +568,20 @@ class TeamsbotService: # using existing audio-based context — but caption text itself is NOT # added to the context buffer. if source in ("caption", "speakerHint"): - self._registerSpeakerHint(speaker, text) + self._registerSpeakerHint(speaker, text, sessionId) if ( source == "speakerHint" and isFinal and not self._isBotSpeaker(speaker) and self.config.responseMode != TeamsbotResponseMode.TRANSCRIBE_ONLY + and self._detectBotName(text) ): - shouldTriggerFromHint = self._shouldTriggerAnalysis(text, allowPeriodic=False) - logger.debug( - f"Session {sessionId}: speakerHint shouldTriggerAnalysis={shouldTriggerFromHint}, " - f"bufferSize={len(self._contextBuffer)}" - ) - if shouldTriggerFromHint: - logger.info( - f"Session {sessionId}: Triggering AI analysis from speakerHint address detection " - f"(buffer: {len(self._contextBuffer)} segments, caption text NOT in buffer)" - ) - await self._analyzeAndRespond( - sessionId, - interface, - voiceInterface, - websocket, - {"id": None, "speaker": speaker, "text": text, "source": source}, - ) + triggerTranscript = {"id": None, "speaker": speaker, "text": text, "source": source} + isNew = self._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, triggerTranscript) + if isNew: + logger.info(f"Session {sessionId}: Bot name in caption, debounce trigger started") + asyncio.create_task(self._checkPendingNameTrigger()) return # Chat history: messages sent before the bot joined the meeting. @@ -625,34 +622,27 @@ class TeamsbotService: return # Differential transcript writing: - # If the same speaker is still talking and the new text is a - # continuation (starts with the previous text), UPDATE the existing - # record instead of creating a new one. This avoids cascading rows like: - # "Der AHV" - # "Der AHV Fonds" - # "Der AHV Fonds hat 2025" - # and instead keeps a single row that grows until the speaker changes. - isContinuation = ( - self._lastTranscriptSpeaker == speaker - and self._lastTranscriptText - and self._lastTranscriptId - and text.startswith(self._lastTranscriptText) - and source in ("caption", "audioCapture") + # audioCapture from same speaker → append text (merge STT chunks into one block) + # other sources → always create a new record + isMerge = ( + source == "audioCapture" + and self._lastTranscriptSpeaker == speaker + and self._lastTranscriptText is not None + and self._lastTranscriptId is not None ) - if isContinuation: + if isMerge: + mergedText = f"{self._lastTranscriptText} {text}" interface.updateTranscript(self._lastTranscriptId, { - "text": text, + "text": mergedText, "isFinal": isFinal, }) - self._lastTranscriptText = text + self._lastTranscriptText = mergedText createdTranscript = {"id": self._lastTranscriptId} - # Update context buffer: replace last entry for same speaker if self._contextBuffer and self._contextBuffer[-1].get("speaker") == speaker: - self._contextBuffer[-1]["text"] = text + self._contextBuffer[-1]["text"] = mergedText else: - # New speaker or non-continuation → create a new record transcriptData = TeamsbotTranscript( sessionId=sessionId, speaker=speaker, @@ -665,12 +655,13 @@ class TeamsbotService: createdTranscript = interface.createTranscript(transcriptData) - # Track for differential writing self._lastTranscriptSpeaker = speaker self._lastTranscriptText = text self._lastTranscriptId = createdTranscript.get("id") - # Append to context buffer + if source == "audioCapture" and speaker == "Unknown": + self._unattributedTranscriptIds.append(createdTranscript.get("id")) + self._contextBuffer.append({ "speaker": speaker or "Unknown", "text": text, @@ -678,27 +669,25 @@ class TeamsbotService: "source": source, }) - # Keep only last N segments maxSegments = self.config.contextWindowSegments if len(self._contextBuffer) > maxSegments: if not self._contextSummary and len(self._contextBuffer) > maxSegments * 1.5: asyncio.create_task(self._summarizeContextBuffer(sessionId)) self._contextBuffer = self._contextBuffer[-maxSegments:] - # Update session transcript count (only for new records) session = interface.getSession(sessionId) if session: count = session.get("transcriptSegmentCount", 0) + 1 interface.updateSession(sessionId, {"transcriptSegmentCount": count}) - # Emit SSE event for live transcript (always, for UI updates) + displayText = self._lastTranscriptText if isMerge else text await _emitSessionEvent(sessionId, "transcript", { "id": createdTranscript.get("id"), "speaker": speaker, - "text": text, + "text": displayText, "confidence": 1.0, "timestamp": getIsoTimestamp(), - "isContinuation": isContinuation, + "isContinuation": isMerge, "source": source, "speakerResolvedFromHint": ( speakerResolvedFromHint @@ -707,23 +696,29 @@ class TeamsbotService: ), }) - # Check if AI analysis should be triggered (only for final transcripts) if not isFinal: return if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY: - logger.debug(f"Session {sessionId}: responseMode=TRANSCRIBE_ONLY, skipping AI analysis") return - shouldTrigger = self._shouldTriggerAnalysis(text) - logger.debug(f"Session {sessionId}: shouldTriggerAnalysis={shouldTrigger}, bufferSize={len(self._contextBuffer)}, responseMode={self.config.responseMode}") - - if not shouldTrigger: + # Update activity for any pending debounced trigger + if self._pendingNameTrigger: + self._pendingNameTrigger["lastActivity"] = time.time() + + # Bot name detection → debounced trigger (wait for speaker to finish) + if self._detectBotName(text): + isNew = self._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, createdTranscript) + if isNew: + asyncio.create_task(self._checkPendingNameTrigger()) return - # SPEECH_TEAMS AI analysis - logger.info(f"Session {sessionId}: Triggering AI analysis (buffer: {len(self._contextBuffer)} segments)") - await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript) + # Periodic trigger (only when no debounce pending) + if not self._pendingNameTrigger: + shouldTrigger = self._shouldTriggerAnalysis(text) + if shouldTrigger: + logger.info(f"Session {sessionId}: Periodic trigger (buffer: {len(self._contextBuffer)} segments)") + await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript) def _isBotSpeaker(self, speaker: str) -> bool: """Check if a transcript speaker is the bot itself. @@ -755,64 +750,90 @@ class TeamsbotService: def _shouldTriggerAnalysis(self, transcriptText: str, allowPeriodic: bool = True) -> bool: """ Decide whether to trigger AI analysis based on the latest transcript. - Triggers: - - Bot name mentioned (immediate) - - Periodic interval elapsed - - Cooldown respected + Bot name detection is handled separately via debounce. + This method only checks periodic/cooldown triggers. """ now = time.time() timeSinceLastCall = now - self._lastAiCallTime - # Bot name detection — overrides the periodic cooldown but still - # respects a minimum re-trigger interval to prevent caption-event - # spam (multiple caption snapshots of the same utterance). - minNameRetriggerSeconds = 10 - botNameLower = self.config.botName.lower() - transcriptLower = transcriptText.lower() - - nameDetected = False - if botNameLower in transcriptLower: - nameDetected = True - else: - botFirstName = botNameLower.split()[0] if " " in botNameLower else botNameLower - if len(botFirstName) >= 3: - for word in transcriptLower.split(): - cleanWord = word.strip(".,!?:;\"'()[]") - if not cleanWord or len(cleanWord) < 3: - continue - if cleanWord == botFirstName: - nameDetected = True - break - if cleanWord[0] == botFirstName[0] and abs(len(cleanWord) - len(botFirstName)) <= 2: - common = sum(1 for c in set(botFirstName) if c in cleanWord) - similarity = common / max(len(set(botFirstName)), len(set(cleanWord))) - if similarity >= 0.6: - nameDetected = True - break - - if nameDetected: - if timeSinceLastCall < minNameRetriggerSeconds: - logger.debug( - f"Trigger: Bot name detected but within re-trigger cooldown " - f"({timeSinceLastCall:.1f}s < {minNameRetriggerSeconds}s)" - ) - return False - logger.info(f"Trigger: Bot name detected in transcript: '{transcriptText[:60]}...'") - return True - - # Cooldown check (only for non-name triggers) if timeSinceLastCall < self.config.triggerCooldownSeconds: - logger.debug(f"Trigger: Cooldown active ({timeSinceLastCall:.1f}s < {self.config.triggerCooldownSeconds}s)") return False - # Periodic trigger if allowPeriodic and timeSinceLastCall >= self.config.triggerIntervalSeconds: - logger.info(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed ({timeSinceLastCall:.1f}s since last call)") + logger.info(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed ({timeSinceLastCall:.1f}s)") return True - logger.debug(f"Trigger: No trigger ({timeSinceLastCall:.1f}s / {self.config.triggerIntervalSeconds}s interval)") return False + def _detectBotName(self, text: str) -> bool: + """Check if text contains the bot's name (exact or phonetically similar).""" + botNameLower = self.config.botName.lower() + textLower = text.lower() + + if botNameLower in textLower: + return True + + botFirstName = botNameLower.split()[0] if " " in botNameLower else botNameLower + if len(botFirstName) >= 3: + for word in textLower.split(): + cleanWord = word.strip(".,!?:;\"'()[]") + if not cleanWord or len(cleanWord) < 3: + continue + if cleanWord == botFirstName: + return True + if cleanWord[0] == botFirstName[0] and abs(len(cleanWord) - len(botFirstName)) <= 2: + common = sum(1 for c in set(botFirstName) if c in cleanWord) + similarity = common / max(len(set(botFirstName)), len(set(cleanWord))) + if similarity >= 0.6: + return True + return False + + def _setPendingNameTrigger(self, sessionId, interface, voiceInterface, websocket, triggerTranscript) -> bool: + """Set or update a debounced name trigger. Returns True if newly set.""" + if self._pendingNameTrigger: + self._pendingNameTrigger["lastActivity"] = time.time() + return False + self._pendingNameTrigger = { + "sessionId": sessionId, + "interface": interface, + "voiceInterface": voiceInterface, + "websocket": websocket, + "triggerTranscript": triggerTranscript, + "detectedAt": time.time(), + "lastActivity": time.time(), + } + return True + + async def _checkPendingNameTrigger(self, delaySec: float = 3.0): + """Async loop: fire the pending name trigger once the speaker is quiet.""" + await asyncio.sleep(delaySec) + if not self._pendingNameTrigger: + return + + now = time.time() + lastActivity = self._pendingNameTrigger.get("lastActivity", 0) + detectedAt = self._pendingNameTrigger.get("detectedAt", 0) + quietSec = now - lastActivity + totalWaitSec = now - detectedAt + + if quietSec >= 3.0 or totalWaitSec >= 15.0: + trigger = self._pendingNameTrigger + self._pendingNameTrigger = None + logger.info( + f"Session {trigger['sessionId']}: Debounced name trigger fires " + f"(quiet={quietSec:.1f}s, totalWait={totalWaitSec:.1f}s)" + ) + await self._analyzeAndRespond( + trigger["sessionId"], + trigger["interface"], + trigger["voiceInterface"], + trigger["websocket"], + trigger["triggerTranscript"], + ) + else: + remaining = max(0.5, 3.0 - quietSec) + asyncio.create_task(self._checkPendingNameTrigger(remaining)) + async def _analyzeAndRespond( self, sessionId: str, @@ -1086,6 +1107,42 @@ class TeamsbotService: self._lastBotResponseText = normalizedResponse self._lastBotResponseTs = nowTs + + # Record bot response in transcript (exactly once, regardless of channel) + botTranscriptData = TeamsbotTranscript( + sessionId=sessionId, + speaker=self.config.botName, + text=speechResult.responseText, + timestamp=getIsoTimestamp(), + confidence=1.0, + language=self.config.language, + isFinal=True, + ).model_dump() + botTranscript = interface.createTranscript(botTranscriptData) + + self._contextBuffer.append({ + "speaker": self.config.botName, + "text": speechResult.responseText, + "timestamp": getUtcTimestamp(), + "source": "botResponse", + }) + + await _emitSessionEvent(sessionId, "transcript", { + "id": botTranscript.get("id"), + "speaker": self.config.botName, + "text": speechResult.responseText, + "confidence": 1.0, + "timestamp": getIsoTimestamp(), + "isContinuation": False, + "source": "botResponse", + "speakerResolvedFromHint": False, + }) + + # Reset differential writing tracker so next STT creates a new block + self._lastTranscriptSpeaker = self.config.botName + self._lastTranscriptText = speechResult.responseText + self._lastTranscriptId = botTranscript.get("id") + logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}") # Step 5: Execute AI-issued commands (if any)