Teamsbot: redesign speaker attribution, add bot responses to transcript, debounce name triggers
Made-with: Cursor
This commit is contained in:
parent
269b704812
commit
80e8197d96
1 changed files with 184 additions and 127 deletions
|
|
@ -83,15 +83,20 @@ class TeamsbotService:
|
|||
self._sessionContext: Optional[str] = None # User-provided background context
|
||||
self._contextSummary: Optional[str] = None # AI-generated summary of long context
|
||||
|
||||
# Differential transcript tracking: only write new text, update existing
|
||||
# record when the same speaker continues speaking
|
||||
# Differential transcript tracking
|
||||
self._lastTranscriptSpeaker: Optional[str] = None
|
||||
self._lastTranscriptText: Optional[str] = None
|
||||
self._lastTranscriptId: Optional[str] = None
|
||||
self._recentSpeakerHints: List[Dict[str, Any]] = []
|
||||
self._lastBotResponseText: Optional[str] = None
|
||||
self._lastBotResponseTs: float = 0.0
|
||||
|
||||
# Speaker attribution: simple last-caption-speaker model
|
||||
self._lastCaptionSpeaker: Optional[str] = None
|
||||
self._unattributedTranscriptIds: List[str] = []
|
||||
|
||||
# Debounced name trigger: wait for speaker to finish before AI analysis
|
||||
self._pendingNameTrigger: Optional[Dict[str, Any]] = None
|
||||
|
||||
# =========================================================================
|
||||
# Session Lifecycle
|
||||
# =========================================================================
|
||||
|
|
@ -494,40 +499,43 @@ class TeamsbotService:
|
|||
except Exception as e:
|
||||
logger.error(f"[AudioChunk] STT error for session {sessionId}: {type(e).__name__}: {e}")
|
||||
|
||||
def _registerSpeakerHint(self, speaker: str, text: str):
|
||||
"""Store recent speaker hints from captions for audio-mode speaker attribution."""
|
||||
def _registerSpeakerHint(self, speaker: str, text: str, sessionId: str = ""):
|
||||
"""Track current speaker from captions for STT attribution.
|
||||
When the first non-bot caption arrives, retroactively attributes
|
||||
any STT segments that were created before a speaker was known."""
|
||||
if not speaker:
|
||||
return
|
||||
normalizedSpeaker = speaker.strip()
|
||||
if not normalizedSpeaker or self._isBotSpeaker(normalizedSpeaker):
|
||||
return
|
||||
|
||||
self._recentSpeakerHints.append({
|
||||
"speaker": normalizedSpeaker,
|
||||
"text": (text or "").strip(),
|
||||
"timestamp": time.time(),
|
||||
})
|
||||
prevSpeaker = self._lastCaptionSpeaker
|
||||
self._lastCaptionSpeaker = normalizedSpeaker
|
||||
|
||||
# Keep only the latest 20 hints
|
||||
if len(self._recentSpeakerHints) > 20:
|
||||
self._recentSpeakerHints = self._recentSpeakerHints[-20:]
|
||||
if prevSpeaker is None and self._unattributedTranscriptIds:
|
||||
from . import interfaceFeatureTeamsbot as interfaceDb
|
||||
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
|
||||
for tid in self._unattributedTranscriptIds:
|
||||
interface.updateTranscript(tid, {"speaker": normalizedSpeaker})
|
||||
for seg in self._contextBuffer:
|
||||
if seg.get("speaker") == "Unknown" and seg.get("source") == "audioCapture":
|
||||
seg["speaker"] = normalizedSpeaker
|
||||
if self._lastTranscriptSpeaker == "Unknown":
|
||||
self._lastTranscriptSpeaker = normalizedSpeaker
|
||||
logger.info(
|
||||
f"Session {sessionId}: Retroactive speaker attribution: "
|
||||
f"{len(self._unattributedTranscriptIds)} segments -> {normalizedSpeaker}"
|
||||
)
|
||||
self._unattributedTranscriptIds.clear()
|
||||
|
||||
if self._pendingNameTrigger:
|
||||
self._pendingNameTrigger["lastActivity"] = time.time()
|
||||
|
||||
def _resolveSpeakerForAudioCapture(self) -> Dict[str, Any]:
|
||||
"""Best-effort speaker name for audio chunks using recent caption hints."""
|
||||
if not self._recentSpeakerHints:
|
||||
return {"speaker": "Meeting Audio", "speakerResolvedFromHint": False}
|
||||
|
||||
nowTs = time.time()
|
||||
# Prefer very recent hints to reduce wrong attribution
|
||||
for hint in reversed(self._recentSpeakerHints):
|
||||
hintAge = nowTs - hint.get("timestamp", 0)
|
||||
if hintAge <= 15:
|
||||
return {
|
||||
"speaker": hint.get("speaker", "Meeting Audio"),
|
||||
"speakerResolvedFromHint": True,
|
||||
}
|
||||
|
||||
return {"speaker": "Meeting Audio", "speakerResolvedFromHint": False}
|
||||
"""Speaker name for audio chunks — uses the last caption speaker."""
|
||||
if self._lastCaptionSpeaker:
|
||||
return {"speaker": self._lastCaptionSpeaker, "speakerResolvedFromHint": True}
|
||||
return {"speaker": "Unknown", "speakerResolvedFromHint": False}
|
||||
|
||||
async def _processTranscript(
|
||||
self,
|
||||
|
|
@ -560,31 +568,20 @@ class TeamsbotService:
|
|||
# using existing audio-based context — but caption text itself is NOT
|
||||
# added to the context buffer.
|
||||
if source in ("caption", "speakerHint"):
|
||||
self._registerSpeakerHint(speaker, text)
|
||||
self._registerSpeakerHint(speaker, text, sessionId)
|
||||
|
||||
if (
|
||||
source == "speakerHint"
|
||||
and isFinal
|
||||
and not self._isBotSpeaker(speaker)
|
||||
and self.config.responseMode != TeamsbotResponseMode.TRANSCRIBE_ONLY
|
||||
and self._detectBotName(text)
|
||||
):
|
||||
shouldTriggerFromHint = self._shouldTriggerAnalysis(text, allowPeriodic=False)
|
||||
logger.debug(
|
||||
f"Session {sessionId}: speakerHint shouldTriggerAnalysis={shouldTriggerFromHint}, "
|
||||
f"bufferSize={len(self._contextBuffer)}"
|
||||
)
|
||||
if shouldTriggerFromHint:
|
||||
logger.info(
|
||||
f"Session {sessionId}: Triggering AI analysis from speakerHint address detection "
|
||||
f"(buffer: {len(self._contextBuffer)} segments, caption text NOT in buffer)"
|
||||
)
|
||||
await self._analyzeAndRespond(
|
||||
sessionId,
|
||||
interface,
|
||||
voiceInterface,
|
||||
websocket,
|
||||
{"id": None, "speaker": speaker, "text": text, "source": source},
|
||||
)
|
||||
triggerTranscript = {"id": None, "speaker": speaker, "text": text, "source": source}
|
||||
isNew = self._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, triggerTranscript)
|
||||
if isNew:
|
||||
logger.info(f"Session {sessionId}: Bot name in caption, debounce trigger started")
|
||||
asyncio.create_task(self._checkPendingNameTrigger())
|
||||
return
|
||||
|
||||
# Chat history: messages sent before the bot joined the meeting.
|
||||
|
|
@ -625,34 +622,27 @@ class TeamsbotService:
|
|||
return
|
||||
|
||||
# Differential transcript writing:
|
||||
# If the same speaker is still talking and the new text is a
|
||||
# continuation (starts with the previous text), UPDATE the existing
|
||||
# record instead of creating a new one. This avoids cascading rows like:
|
||||
# "Der AHV"
|
||||
# "Der AHV Fonds"
|
||||
# "Der AHV Fonds hat 2025"
|
||||
# and instead keeps a single row that grows until the speaker changes.
|
||||
isContinuation = (
|
||||
self._lastTranscriptSpeaker == speaker
|
||||
and self._lastTranscriptText
|
||||
and self._lastTranscriptId
|
||||
and text.startswith(self._lastTranscriptText)
|
||||
and source in ("caption", "audioCapture")
|
||||
# audioCapture from same speaker → append text (merge STT chunks into one block)
|
||||
# other sources → always create a new record
|
||||
isMerge = (
|
||||
source == "audioCapture"
|
||||
and self._lastTranscriptSpeaker == speaker
|
||||
and self._lastTranscriptText is not None
|
||||
and self._lastTranscriptId is not None
|
||||
)
|
||||
|
||||
if isContinuation:
|
||||
if isMerge:
|
||||
mergedText = f"{self._lastTranscriptText} {text}"
|
||||
interface.updateTranscript(self._lastTranscriptId, {
|
||||
"text": text,
|
||||
"text": mergedText,
|
||||
"isFinal": isFinal,
|
||||
})
|
||||
self._lastTranscriptText = text
|
||||
self._lastTranscriptText = mergedText
|
||||
createdTranscript = {"id": self._lastTranscriptId}
|
||||
|
||||
# Update context buffer: replace last entry for same speaker
|
||||
if self._contextBuffer and self._contextBuffer[-1].get("speaker") == speaker:
|
||||
self._contextBuffer[-1]["text"] = text
|
||||
self._contextBuffer[-1]["text"] = mergedText
|
||||
else:
|
||||
# New speaker or non-continuation → create a new record
|
||||
transcriptData = TeamsbotTranscript(
|
||||
sessionId=sessionId,
|
||||
speaker=speaker,
|
||||
|
|
@ -665,12 +655,13 @@ class TeamsbotService:
|
|||
|
||||
createdTranscript = interface.createTranscript(transcriptData)
|
||||
|
||||
# Track for differential writing
|
||||
self._lastTranscriptSpeaker = speaker
|
||||
self._lastTranscriptText = text
|
||||
self._lastTranscriptId = createdTranscript.get("id")
|
||||
|
||||
# Append to context buffer
|
||||
if source == "audioCapture" and speaker == "Unknown":
|
||||
self._unattributedTranscriptIds.append(createdTranscript.get("id"))
|
||||
|
||||
self._contextBuffer.append({
|
||||
"speaker": speaker or "Unknown",
|
||||
"text": text,
|
||||
|
|
@ -678,27 +669,25 @@ class TeamsbotService:
|
|||
"source": source,
|
||||
})
|
||||
|
||||
# Keep only last N segments
|
||||
maxSegments = self.config.contextWindowSegments
|
||||
if len(self._contextBuffer) > maxSegments:
|
||||
if not self._contextSummary and len(self._contextBuffer) > maxSegments * 1.5:
|
||||
asyncio.create_task(self._summarizeContextBuffer(sessionId))
|
||||
self._contextBuffer = self._contextBuffer[-maxSegments:]
|
||||
|
||||
# Update session transcript count (only for new records)
|
||||
session = interface.getSession(sessionId)
|
||||
if session:
|
||||
count = session.get("transcriptSegmentCount", 0) + 1
|
||||
interface.updateSession(sessionId, {"transcriptSegmentCount": count})
|
||||
|
||||
# Emit SSE event for live transcript (always, for UI updates)
|
||||
displayText = self._lastTranscriptText if isMerge else text
|
||||
await _emitSessionEvent(sessionId, "transcript", {
|
||||
"id": createdTranscript.get("id"),
|
||||
"speaker": speaker,
|
||||
"text": text,
|
||||
"text": displayText,
|
||||
"confidence": 1.0,
|
||||
"timestamp": getIsoTimestamp(),
|
||||
"isContinuation": isContinuation,
|
||||
"isContinuation": isMerge,
|
||||
"source": source,
|
||||
"speakerResolvedFromHint": (
|
||||
speakerResolvedFromHint
|
||||
|
|
@ -707,23 +696,29 @@ class TeamsbotService:
|
|||
),
|
||||
})
|
||||
|
||||
# Check if AI analysis should be triggered (only for final transcripts)
|
||||
if not isFinal:
|
||||
return
|
||||
|
||||
if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY:
|
||||
logger.debug(f"Session {sessionId}: responseMode=TRANSCRIBE_ONLY, skipping AI analysis")
|
||||
return
|
||||
|
||||
shouldTrigger = self._shouldTriggerAnalysis(text)
|
||||
logger.debug(f"Session {sessionId}: shouldTriggerAnalysis={shouldTrigger}, bufferSize={len(self._contextBuffer)}, responseMode={self.config.responseMode}")
|
||||
|
||||
if not shouldTrigger:
|
||||
# Update activity for any pending debounced trigger
|
||||
if self._pendingNameTrigger:
|
||||
self._pendingNameTrigger["lastActivity"] = time.time()
|
||||
|
||||
# Bot name detection → debounced trigger (wait for speaker to finish)
|
||||
if self._detectBotName(text):
|
||||
isNew = self._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, createdTranscript)
|
||||
if isNew:
|
||||
asyncio.create_task(self._checkPendingNameTrigger())
|
||||
return
|
||||
|
||||
# SPEECH_TEAMS AI analysis
|
||||
logger.info(f"Session {sessionId}: Triggering AI analysis (buffer: {len(self._contextBuffer)} segments)")
|
||||
await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript)
|
||||
# Periodic trigger (only when no debounce pending)
|
||||
if not self._pendingNameTrigger:
|
||||
shouldTrigger = self._shouldTriggerAnalysis(text)
|
||||
if shouldTrigger:
|
||||
logger.info(f"Session {sessionId}: Periodic trigger (buffer: {len(self._contextBuffer)} segments)")
|
||||
await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript)
|
||||
|
||||
def _isBotSpeaker(self, speaker: str) -> bool:
|
||||
"""Check if a transcript speaker is the bot itself.
|
||||
|
|
@ -755,64 +750,90 @@ class TeamsbotService:
|
|||
def _shouldTriggerAnalysis(self, transcriptText: str, allowPeriodic: bool = True) -> bool:
|
||||
"""
|
||||
Decide whether to trigger AI analysis based on the latest transcript.
|
||||
Triggers:
|
||||
- Bot name mentioned (immediate)
|
||||
- Periodic interval elapsed
|
||||
- Cooldown respected
|
||||
Bot name detection is handled separately via debounce.
|
||||
This method only checks periodic/cooldown triggers.
|
||||
"""
|
||||
now = time.time()
|
||||
timeSinceLastCall = now - self._lastAiCallTime
|
||||
|
||||
# Bot name detection — overrides the periodic cooldown but still
|
||||
# respects a minimum re-trigger interval to prevent caption-event
|
||||
# spam (multiple caption snapshots of the same utterance).
|
||||
minNameRetriggerSeconds = 10
|
||||
botNameLower = self.config.botName.lower()
|
||||
transcriptLower = transcriptText.lower()
|
||||
|
||||
nameDetected = False
|
||||
if botNameLower in transcriptLower:
|
||||
nameDetected = True
|
||||
else:
|
||||
botFirstName = botNameLower.split()[0] if " " in botNameLower else botNameLower
|
||||
if len(botFirstName) >= 3:
|
||||
for word in transcriptLower.split():
|
||||
cleanWord = word.strip(".,!?:;\"'()[]")
|
||||
if not cleanWord or len(cleanWord) < 3:
|
||||
continue
|
||||
if cleanWord == botFirstName:
|
||||
nameDetected = True
|
||||
break
|
||||
if cleanWord[0] == botFirstName[0] and abs(len(cleanWord) - len(botFirstName)) <= 2:
|
||||
common = sum(1 for c in set(botFirstName) if c in cleanWord)
|
||||
similarity = common / max(len(set(botFirstName)), len(set(cleanWord)))
|
||||
if similarity >= 0.6:
|
||||
nameDetected = True
|
||||
break
|
||||
|
||||
if nameDetected:
|
||||
if timeSinceLastCall < minNameRetriggerSeconds:
|
||||
logger.debug(
|
||||
f"Trigger: Bot name detected but within re-trigger cooldown "
|
||||
f"({timeSinceLastCall:.1f}s < {minNameRetriggerSeconds}s)"
|
||||
)
|
||||
return False
|
||||
logger.info(f"Trigger: Bot name detected in transcript: '{transcriptText[:60]}...'")
|
||||
return True
|
||||
|
||||
# Cooldown check (only for non-name triggers)
|
||||
if timeSinceLastCall < self.config.triggerCooldownSeconds:
|
||||
logger.debug(f"Trigger: Cooldown active ({timeSinceLastCall:.1f}s < {self.config.triggerCooldownSeconds}s)")
|
||||
return False
|
||||
|
||||
# Periodic trigger
|
||||
if allowPeriodic and timeSinceLastCall >= self.config.triggerIntervalSeconds:
|
||||
logger.info(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed ({timeSinceLastCall:.1f}s since last call)")
|
||||
logger.info(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed ({timeSinceLastCall:.1f}s)")
|
||||
return True
|
||||
|
||||
logger.debug(f"Trigger: No trigger ({timeSinceLastCall:.1f}s / {self.config.triggerIntervalSeconds}s interval)")
|
||||
return False
|
||||
|
||||
def _detectBotName(self, text: str) -> bool:
|
||||
"""Check if text contains the bot's name (exact or phonetically similar)."""
|
||||
botNameLower = self.config.botName.lower()
|
||||
textLower = text.lower()
|
||||
|
||||
if botNameLower in textLower:
|
||||
return True
|
||||
|
||||
botFirstName = botNameLower.split()[0] if " " in botNameLower else botNameLower
|
||||
if len(botFirstName) >= 3:
|
||||
for word in textLower.split():
|
||||
cleanWord = word.strip(".,!?:;\"'()[]")
|
||||
if not cleanWord or len(cleanWord) < 3:
|
||||
continue
|
||||
if cleanWord == botFirstName:
|
||||
return True
|
||||
if cleanWord[0] == botFirstName[0] and abs(len(cleanWord) - len(botFirstName)) <= 2:
|
||||
common = sum(1 for c in set(botFirstName) if c in cleanWord)
|
||||
similarity = common / max(len(set(botFirstName)), len(set(cleanWord)))
|
||||
if similarity >= 0.6:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _setPendingNameTrigger(self, sessionId, interface, voiceInterface, websocket, triggerTranscript) -> bool:
|
||||
"""Set or update a debounced name trigger. Returns True if newly set."""
|
||||
if self._pendingNameTrigger:
|
||||
self._pendingNameTrigger["lastActivity"] = time.time()
|
||||
return False
|
||||
self._pendingNameTrigger = {
|
||||
"sessionId": sessionId,
|
||||
"interface": interface,
|
||||
"voiceInterface": voiceInterface,
|
||||
"websocket": websocket,
|
||||
"triggerTranscript": triggerTranscript,
|
||||
"detectedAt": time.time(),
|
||||
"lastActivity": time.time(),
|
||||
}
|
||||
return True
|
||||
|
||||
async def _checkPendingNameTrigger(self, delaySec: float = 3.0):
|
||||
"""Async loop: fire the pending name trigger once the speaker is quiet."""
|
||||
await asyncio.sleep(delaySec)
|
||||
if not self._pendingNameTrigger:
|
||||
return
|
||||
|
||||
now = time.time()
|
||||
lastActivity = self._pendingNameTrigger.get("lastActivity", 0)
|
||||
detectedAt = self._pendingNameTrigger.get("detectedAt", 0)
|
||||
quietSec = now - lastActivity
|
||||
totalWaitSec = now - detectedAt
|
||||
|
||||
if quietSec >= 3.0 or totalWaitSec >= 15.0:
|
||||
trigger = self._pendingNameTrigger
|
||||
self._pendingNameTrigger = None
|
||||
logger.info(
|
||||
f"Session {trigger['sessionId']}: Debounced name trigger fires "
|
||||
f"(quiet={quietSec:.1f}s, totalWait={totalWaitSec:.1f}s)"
|
||||
)
|
||||
await self._analyzeAndRespond(
|
||||
trigger["sessionId"],
|
||||
trigger["interface"],
|
||||
trigger["voiceInterface"],
|
||||
trigger["websocket"],
|
||||
trigger["triggerTranscript"],
|
||||
)
|
||||
else:
|
||||
remaining = max(0.5, 3.0 - quietSec)
|
||||
asyncio.create_task(self._checkPendingNameTrigger(remaining))
|
||||
|
||||
async def _analyzeAndRespond(
|
||||
self,
|
||||
sessionId: str,
|
||||
|
|
@ -1086,6 +1107,42 @@ class TeamsbotService:
|
|||
|
||||
self._lastBotResponseText = normalizedResponse
|
||||
self._lastBotResponseTs = nowTs
|
||||
|
||||
# Record bot response in transcript (exactly once, regardless of channel)
|
||||
botTranscriptData = TeamsbotTranscript(
|
||||
sessionId=sessionId,
|
||||
speaker=self.config.botName,
|
||||
text=speechResult.responseText,
|
||||
timestamp=getIsoTimestamp(),
|
||||
confidence=1.0,
|
||||
language=self.config.language,
|
||||
isFinal=True,
|
||||
).model_dump()
|
||||
botTranscript = interface.createTranscript(botTranscriptData)
|
||||
|
||||
self._contextBuffer.append({
|
||||
"speaker": self.config.botName,
|
||||
"text": speechResult.responseText,
|
||||
"timestamp": getUtcTimestamp(),
|
||||
"source": "botResponse",
|
||||
})
|
||||
|
||||
await _emitSessionEvent(sessionId, "transcript", {
|
||||
"id": botTranscript.get("id"),
|
||||
"speaker": self.config.botName,
|
||||
"text": speechResult.responseText,
|
||||
"confidence": 1.0,
|
||||
"timestamp": getIsoTimestamp(),
|
||||
"isContinuation": False,
|
||||
"source": "botResponse",
|
||||
"speakerResolvedFromHint": False,
|
||||
})
|
||||
|
||||
# Reset differential writing tracker so next STT creates a new block
|
||||
self._lastTranscriptSpeaker = self.config.botName
|
||||
self._lastTranscriptText = speechResult.responseText
|
||||
self._lastTranscriptId = botTranscript.get("id")
|
||||
|
||||
logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}")
|
||||
|
||||
# Step 5: Execute AI-issued commands (if any)
|
||||
|
|
|
|||
Loading…
Reference in a new issue