# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Teamsbot Service — Conversation & AI analysis logic. Extracted from service.py. All functions accept `service` (a TeamsbotService instance) as the first parameter so the class can delegate to them. """ import logging import json import re import asyncio import time from typing import Optional, Dict, Any, List from fastapi import WebSocket from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum from modules.shared.timeUtils import getUtcTimestamp from .datamodelTeamsbot import ( TeamsbotTranscript, TeamsbotBotResponse, TeamsbotResponseType, TeamsbotResponseMode, TeamsbotResponseChannel, SpeechTeamsResponse, TeamsbotDirectorPromptMode, TeamsbotDirectorPromptStatus, ) logger = logging.getLogger(__name__) async def _analyzeAndRespond( service, sessionId: str, interface, voiceInterface, websocket: WebSocket, triggerTranscript: Dict[str, Any], ): """Run SPEECH_TEAMS AI analysis and respond if needed.""" from .service import ( _emitSessionEvent, createAiService, _speakTextChunked, _voiceFriendlyMeetingText, TEAMSBOT_AGENT_MAX_ROUNDS, TEAMSBOT_AGENT_MAX_COST_CHF, ) if service._aiAnalysisInProgress: logger.info(f"Session {sessionId}: AI analysis already in progress, skipping duplicate trigger") return if service._agentEscalationInFlight: logger.info( f"Session {sessionId}: Agent escalation still in flight — " f"skipping new SPEECH_TEAMS trigger to prevent overlapping replies" ) return service._aiAnalysisInProgress = True service._lastAiCallTime = time.time() contextLines = [] for segment in service._contextBuffer: speaker = segment.get("speaker", "Unknown") text = segment.get("text", "") segSource = segment.get("source", "caption") prefix = "Chat" if segSource == "chat" else "" if service._isBotSpeaker(speaker): contextLines.append(f"[YOU ({service.config.botName})]: {text}") elif prefix: contextLines.append(f"[{prefix}: {speaker}]: {text}") else: contextLines.append(f"[{speaker}]: {text}") sessionContextStr = "" if service._sessionContext: sessionContextStr = f"\nSESSION_CONTEXT (background knowledge provided by the user):\n{service._sessionContext}\n" summaryStr = "" if service._contextSummary: summaryStr = f"\nEARLIER_CONVERSATION_SUMMARY:\n{service._contextSummary}\n" directorStr = service._buildPersistentDirectorContext() transcriptContext = f"BOT_NAME:{service.config.botName}{sessionContextStr}{summaryStr}{directorStr}\nRECENT_TRANSCRIPT:\n" + "\n".join(contextLines) try: aiService = createAiService(service.currentUser, service.mandateId, service.instanceId) await aiService.ensureAiObjectsInitialized() request = AiCallRequest( prompt=service.config.aiSystemPrompt, context=transcriptContext, options=AiCallOptions( operationType=OperationTypeEnum.SPEECH_TEAMS, priority=PriorityEnum.SPEED, ) ) response = await aiService.callAi(request) try: speechResult = SpeechTeamsResponse.model_validate_json(response.content) except Exception: try: jsonStr = response.content if "```json" in jsonStr: jsonStr = jsonStr.split("```json")[1].split("```")[0] elif "```" in jsonStr: jsonStr = jsonStr.split("```")[1].split("```")[0] speechResult = SpeechTeamsResponse.model_validate_json(jsonStr.strip()) except Exception as parseErr: logger.warning(f"Failed to parse SPEECH_TEAMS response: {parseErr}") speechResult = SpeechTeamsResponse( shouldRespond=False, reasoning=f"Parse error: {str(parseErr)[:100]}", detectedIntent="none" ) logger.info( f"SPEECH_TEAMS result: shouldRespond={speechResult.shouldRespond}, " f"intent={speechResult.detectedIntent}, " f"reasoning={speechResult.reasoning[:80]}..." ) await _emitSessionEvent(sessionId, "analysis", { "shouldRespond": speechResult.shouldRespond, "detectedIntent": speechResult.detectedIntent, "reasoning": speechResult.reasoning, "modelName": response.modelName, "processingTime": response.processingTime, "priceCHF": response.priceCHF, "needsAgent": speechResult.needsAgent, "agentReason": speechResult.agentReason, }) if speechResult.needsAgent: briefings = service._collectActiveDirectorBriefings() briefingFileIds = service._collectDirectorFileIds() briefingBlock = "" if briefings: parts = [] for b in briefings: seg = f"- ({b.get('mode')}) {b.get('text', '')}".rstrip() if b.get("fileIds"): seg += f"\n attachedFileIds: {', '.join(b['fileIds'])}" if b.get("note"): note = b["note"] seg += ( "\n priorAgentAnalysis: " + (note if len(note) <= 800 else note[:800] + "...") ) parts.append(seg) briefingBlock = ( "\n\nACTIVE_OPERATOR_BRIEFINGS (private; you may read the " "attached files via summarizeContent / readFile / " "readContentObjects to answer the user precisely; do NOT " "quote the directive text itself):\n" + "\n".join(parts) ) logger.info( f"Session {sessionId}: SPEECH_TEAMS escalates to agent. " f"Reason: {speechResult.agentReason or speechResult.reasoning} | " f"briefings={len(briefings)}, fileIds={len(briefingFileIds)}" ) taskBrief = ( (speechResult.agentReason or speechResult.responseText or "Verarbeite die juengste Sprecheranfrage und antworte ins Meeting.") + briefingBlock ) service._agentEscalationInFlight = True service._currentEscalationTask = asyncio.create_task( _runEscalationAndRelease( service, sessionId=sessionId, taskBrief=taskBrief, briefingFileIds=briefingFileIds, triggerTranscriptId=triggerTranscript.get("id"), ) ) return if speechResult.detectedIntent == "stop": logger.info(f"Session {sessionId}: AI detected STOP intent: {speechResult.reasoning}") if websocket: try: await websocket.send_text(json.dumps({ "type": "stopAudio", "sessionId": sessionId, })) except Exception as stopErr: logger.warning(f"Failed to send stop command: {stopErr}") return if speechResult.shouldRespond and speechResult.responseText: if service.config.responseMode == TeamsbotResponseMode.MANUAL: await _emitSessionEvent(sessionId, "suggestedResponse", { "responseText": speechResult.responseText, "detectedIntent": speechResult.detectedIntent, "reasoning": speechResult.reasoning, }) return channels = speechResult.responseChannels if channels and isinstance(channels, list): channelStr = ",".join(str(c).lower().strip() for c in channels) sendVoice = "voice" in channelStr sendChat = "chat" in channelStr logger.info(f"Response channel (from AI): voice={sendVoice}, chat={sendChat}") else: channelRaw = service.config.responseChannel channelStr = (channelRaw.value if hasattr(channelRaw, 'value') else str(channelRaw)).lower().strip() sendVoice = channelStr in ("voice", "both") sendChat = channelStr in ("chat", "both") logger.info(f"Response channel (from config): '{channelStr}'") if sendVoice and sendChat: responseType = TeamsbotResponseType.BOTH elif sendVoice: responseType = TeamsbotResponseType.AUDIO else: responseType = TeamsbotResponseType.CHAT canonicalText = ( speechResult.responseText or speechResult.responseTextForVoice or speechResult.responseTextForChat or "" ) normalizedResponse = (canonicalText or "").strip().lower() nowTs = time.time() if ( normalizedResponse and service._lastBotResponseText == normalizedResponse and (nowTs - service._lastBotResponseTs) < 90 ): logger.info(f"Session {sessionId}: Suppressing duplicate bot response within 90s window") await _emitSessionEvent(sessionId, "analysis", { "shouldRespond": False, "detectedIntent": speechResult.detectedIntent, "reasoning": "Suppressed duplicate response within 90s", "modelName": response.modelName, "processingTime": response.processingTime, "priceCHF": response.priceCHF, }) return textForVoice = speechResult.responseTextForVoice or speechResult.responseText textForChat = speechResult.responseTextForChat or speechResult.responseText storedText = textForChat or textForVoice or speechResult.responseText if sendVoice and textForVoice: await _emitSessionEvent(sessionId, "ttsDeliveryStatus", { "status": "requested", "hasWebSocket": websocket is not None, "message": "TTS generation requested", "timestamp": getUtcTimestamp(), }) logger.info( f"Session {sessionId}: TTS requested (websocket_available={websocket is not None})" ) if not websocket: logger.warning( f"Session {sessionId}: TTS skipped (bot websocket unavailable, likely fallback mode)" ) await _emitSessionEvent(sessionId, "ttsDeliveryStatus", { "status": "unavailable", "hasWebSocket": False, "message": "TTS skipped — bot websocket unavailable", "timestamp": getUtcTimestamp(), }) if not sendChat: sendChat = True else: spokenText = await _summarizeForVoice(service, sessionId, textForVoice) cancelHook = service._makeAnswerCancelHook() async with service._meetingTtsLock: ttsOutcome = await _speakTextChunked( websocket=websocket, voiceInterface=voiceInterface, sessionId=sessionId, voiceText=spokenText, languageCode=service.config.language, voiceName=service.config.voiceId, isCancelled=cancelHook, ) if ttsOutcome.get("success"): logger.info( f"Session {sessionId}: TTS audio dispatched to bot " f"(chunks={ttsOutcome.get('chunks')}, played={ttsOutcome.get('played')})" ) await _emitSessionEvent(sessionId, "ttsDeliveryStatus", { "status": "dispatched", "hasWebSocket": True, "chunks": ttsOutcome.get("chunks"), "played": ttsOutcome.get("played"), "timestamp": getUtcTimestamp(), }) else: logger.warning( f"TTS failed for session {sessionId}: {ttsOutcome.get('error')}" ) await _emitSessionEvent(sessionId, "ttsDeliveryStatus", { "status": "failed", "hasWebSocket": True, "chunks": ttsOutcome.get("chunks"), "played": ttsOutcome.get("played"), "message": ttsOutcome.get("error"), "timestamp": getUtcTimestamp(), }) if not sendChat: sendChat = True if sendChat and textForChat: try: if websocket: await websocket.send_text(json.dumps({ "type": "sendChatMessage", "sessionId": sessionId, "text": textForChat, })) logger.info(f"Chat response sent for session {sessionId}") except Exception as chatErr: logger.warning(f"Chat message send failed for session {sessionId}: {chatErr}") botResponseData = TeamsbotBotResponse( sessionId=sessionId, responseText=storedText, responseType=responseType, detectedIntent=speechResult.detectedIntent, reasoning=speechResult.reasoning, triggeredByTranscriptId=triggerTranscript.get("id"), modelName=response.modelName, processingTime=response.processingTime, priceCHF=response.priceCHF, timestamp=getUtcTimestamp(), ).model_dump() createdResponse = interface.createBotResponse(botResponseData) await _emitSessionEvent(sessionId, "botResponse", { "id": createdResponse.get("id"), "responseText": storedText, "responseType": responseType.value, "detectedIntent": speechResult.detectedIntent, "reasoning": speechResult.reasoning, "modelName": response.modelName, "processingTime": response.processingTime, "priceCHF": response.priceCHF, "timestamp": botResponseData.get("timestamp"), }) session = interface.getSession(sessionId) if session: count = session.get("botResponseCount", 0) + 1 interface.updateSession(sessionId, {"botResponseCount": count}) service._lastBotResponseText = normalizedResponse service._lastBotResponseTs = nowTs botTranscriptData = TeamsbotTranscript( sessionId=sessionId, speaker=service.config.botName, text=storedText, timestamp=getUtcTimestamp(), confidence=1.0, language=service.config.language, isFinal=True, ).model_dump() botTranscript = interface.createTranscript(botTranscriptData) service._contextBuffer.append({ "speaker": service.config.botName, "text": storedText, "timestamp": getUtcTimestamp(), "source": "botResponse", }) await _emitSessionEvent(sessionId, "transcript", { "id": botTranscript.get("id"), "speaker": service.config.botName, "text": storedText, "confidence": 1.0, "timestamp": getUtcTimestamp(), "isContinuation": False, "source": "botResponse", "speakerResolvedFromHint": False, }) service._lastTranscriptSpeaker = service.config.botName service._lastTranscriptText = storedText service._lastTranscriptId = botTranscript.get("id") service._followUpWindowEnd = time.time() + 15.0 logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}, follow-up window open for 15s") if speechResult.commands: from .serviceCommands import _executeCommands await _executeCommands(service, sessionId, speechResult.commands, voiceInterface, websocket) if speechResult.shouldRespond and not speechResult.responseText: cmdTexts = [ c.params.get("text", "") for c in speechResult.commands if c.action == "sendChat" and c.params and c.params.get("text") ] combinedText = " ".join(cmdTexts) if cmdTexts else None if combinedText: botResponseData = TeamsbotBotResponse( sessionId=sessionId, responseText=combinedText, responseType=TeamsbotResponseType.CHAT, detectedIntent=speechResult.detectedIntent, reasoning=speechResult.reasoning, triggeredByTranscriptId=triggerTranscript.get("id"), modelName=response.modelName, processingTime=response.processingTime, priceCHF=response.priceCHF, timestamp=getUtcTimestamp(), ).model_dump() createdResponse = interface.createBotResponse(botResponseData) await _emitSessionEvent(sessionId, "botResponse", { "id": createdResponse.get("id"), "responseText": combinedText, "responseType": TeamsbotResponseType.CHAT.value, "detectedIntent": speechResult.detectedIntent, "reasoning": speechResult.reasoning, "modelName": response.modelName, "processingTime": response.processingTime, "priceCHF": response.priceCHF, "timestamp": botResponseData.get("timestamp"), }) session = interface.getSession(sessionId) if session: count = session.get("botResponseCount", 0) + 1 interface.updateSession(sessionId, {"botResponseCount": count}) service._followUpWindowEnd = time.time() + 15.0 logger.info( f"Bot responded via commands in session {sessionId}: " f"intent={speechResult.detectedIntent}, follow-up window open for 15s" ) except Exception as e: logger.error(f"SPEECH_TEAMS analysis failed for session {sessionId}: {type(e).__name__}: {e}", exc_info=True) await _emitSessionEvent(sessionId, "error", {"message": f"AI analysis failed: {type(e).__name__}: {str(e)}"}) finally: service._aiAnalysisInProgress = False async def _processTranscript( service, sessionId: str, speaker: str, text: str, isFinal: bool, interface, voiceInterface, websocket: WebSocket, source: str = "caption", speakerResolvedFromHint: Optional[bool] = None, ): """Process a transcript segment from captions or chat messages.""" from .service import _emitSessionEvent text = text.strip() if not text: return if source in ("caption", "speakerHint"): service._registerSpeakerHint(speaker, text, sessionId) if ( source == "speakerHint" and isFinal and not service._isBotSpeaker(speaker) and service.config.responseMode != TeamsbotResponseMode.TRANSCRIBE_ONLY and service._detectBotName(text) ): triggerTranscript = {"id": None, "speaker": speaker, "text": text, "source": source} isNew = service._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, triggerTranscript) if isNew: logger.info(f"Session {sessionId}: Bot name in caption, debounce trigger started") asyncio.create_task(_checkPendingNameTrigger(service)) service._currentQuickAckTask = asyncio.create_task( _runQuickAck(service, sessionId) ) return if source == "chatHistory": transcriptData = TeamsbotTranscript( sessionId=sessionId, speaker=speaker, text=text, timestamp=getUtcTimestamp(), confidence=1.0, language=service.config.language, isFinal=True, source="chatHistory", ).model_dump() createdTranscript = interface.createTranscript(transcriptData) await _emitSessionEvent(sessionId, "transcript", { "id": createdTranscript.get("id"), "speaker": speaker, "text": text, "confidence": 1.0, "timestamp": getUtcTimestamp(), "isContinuation": False, "source": "chatHistory", "isHistory": True, }) logger.debug(f"Session {sessionId}: Chat history stored (no AI trigger): [{speaker}] {text[:60]}") return isBotSpeaker = service._isBotSpeaker(speaker) if isBotSpeaker and source != "chat": logger.debug(f"Session {sessionId}: Ignoring own bot caption from: [{speaker}] {text[:80]}...") return sttPauseThreshold = 5.0 isMerge = ( source == "audioCapture" and service._lastTranscriptSpeaker == speaker and service._lastTranscriptText is not None and service._lastTranscriptId is not None and (time.time() - service._lastSttTime) < sttPauseThreshold ) if isMerge: mergedText = f"{service._lastTranscriptText} {text}" interface.updateTranscript(service._lastTranscriptId, { "text": mergedText, "isFinal": isFinal, }) service._lastTranscriptText = mergedText createdTranscript = {"id": service._lastTranscriptId} if service._contextBuffer and service._contextBuffer[-1].get("speaker") == speaker: service._contextBuffer[-1]["text"] = mergedText else: transcriptData = TeamsbotTranscript( sessionId=sessionId, speaker=speaker, text=text, timestamp=getUtcTimestamp(), confidence=1.0, language=service.config.language, isFinal=isFinal, source=source, ).model_dump() createdTranscript = interface.createTranscript(transcriptData) service._lastTranscriptSpeaker = speaker service._lastTranscriptText = text service._lastTranscriptId = createdTranscript.get("id") if source == "audioCapture" and speaker == "Unknown": service._unattributedTranscriptIds.append(createdTranscript.get("id")) service._contextBuffer.append({ "speaker": speaker or "Unknown", "text": text, "timestamp": getUtcTimestamp(), "source": source, }) maxSegments = service.config.contextWindowSegments if len(service._contextBuffer) > maxSegments: if not service._contextSummary and len(service._contextBuffer) > maxSegments * 1.5: asyncio.create_task(service._summarizeContextBuffer(sessionId)) service._contextBuffer = service._contextBuffer[-maxSegments:] session = interface.getSession(sessionId) if session: count = session.get("transcriptSegmentCount", 0) + 1 interface.updateSession(sessionId, {"transcriptSegmentCount": count}) if source == "audioCapture": service._lastSttTime = time.time() displayText = service._lastTranscriptText if isMerge else text await _emitSessionEvent(sessionId, "transcript", { "id": createdTranscript.get("id"), "speaker": speaker, "text": displayText, "confidence": 1.0, "timestamp": getUtcTimestamp(), "isContinuation": isMerge, "source": source, "speakerResolvedFromHint": ( speakerResolvedFromHint if speakerResolvedFromHint is not None else False ), }) if not isFinal: return if service.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY: return if source == "chat" and isBotSpeaker: return if service._isStopPhrase(text): logger.info( f"Session {sessionId}: Stop phrase detected ('{text.strip()[:60]}'), " f"hard-cancelling in-flight speech immediately" ) from .serviceWebSocket import _cancelInFlightSpeech await _cancelInFlightSpeech( service, sessionId=sessionId, websocket=websocket, reason="userStopPhrase", ) return if service._pendingNameTrigger: service._pendingNameTrigger["lastActivity"] = time.time() if service._detectBotName(text): isNew = service._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, createdTranscript) if isNew: asyncio.create_task(_checkPendingNameTrigger(service)) service._currentQuickAckTask = asyncio.create_task( _runQuickAck(service, sessionId) ) return if ( source == "audioCapture" and not service._isBotSpeaker(speaker) and time.time() < service._followUpWindowEnd and not service._pendingNameTrigger ): isNew = service._setPendingNameTrigger(sessionId, interface, voiceInterface, websocket, createdTranscript) if isNew: logger.info(f"Session {sessionId}: Follow-up window trigger (no name needed)") asyncio.create_task(_checkPendingNameTrigger(service)) return if not service._pendingNameTrigger: shouldTrigger = service._shouldTriggerAnalysis(text) if shouldTrigger: logger.info(f"Session {sessionId}: Periodic trigger (buffer: {len(service._contextBuffer)} segments)") await _analyzeAndRespond(service, sessionId, interface, voiceInterface, websocket, createdTranscript) async def _summarizeForVoice( service, sessionId: str, rawAnswer: str, ) -> str: """Return a SHORT, naturally-spoken paraphrase of ``rawAnswer`` for TTS.""" from .service import _voiceFriendlyMeetingText, createAiService if not rawAnswer or not rawAnswer.strip(): return "" sanitised = _voiceFriendlyMeetingText(rawAnswer) if ( len(sanitised) <= service._VOICE_DIRECT_MAX_CHARS and not service._looksLikeStructuredText(rawAnswer) ): return sanitised targetLang = (service.config.language or "de-DE").strip() botName = (service.config.botName or "").strip() or "the assistant" persona = (service.config.aiSystemPrompt or "").strip() personaBlock = ( f"\n\nBOT PERSONA / TONE:\n{persona}\n" if persona else "" ) prompt = ( f"You are condensing a long written answer into a SHORT spoken " f"paraphrase that the assistant '{botName}' will say out loud " f"into a Microsoft Teams meeting. The full written answer is " f"already in the meeting chat — your job is to summarise it for " f"the EAR, not the eye.\n\n" f"STRICT REQUIREMENTS:\n" f"1. Output language: BCP-47 '{targetLang}'. No other language.\n" f"2. 1 to 3 sentences, max ~{service._VOICE_SUMMARY_MAX_CHARS} characters total.\n" f"3. Natural spoken style — no headings, no bullet points, no " f"tables, no markdown, no emojis, no enumerations like 'Erstens... " f"Zweitens...' unless that genuinely flows in speech.\n" f"4. Capture the essence and the most important conclusion. Do " f"NOT try to fit every detail. Listeners can read the chat for " f"the full version.\n" f"5. End by gently pointing the audience to the chat for details, " f"e.g. 'Details stehen im Chat.' (adapted to the target language).\n" f"6. Output ONLY the spoken text. No JSON, no quotes around it, " f"no preamble like 'Here is the summary:'.\n" f"{personaBlock}\n" f"FULL WRITTEN ANSWER (markdown-formatted, sometimes long):\n" f"---\n{rawAnswer.strip()[:6000]}\n---\n" ) try: aiService = createAiService( service.currentUser, service.mandateId, service.instanceId ) await aiService.ensureAiObjectsInitialized() request = AiCallRequest( prompt=prompt, context="", options=AiCallOptions( operationType=OperationTypeEnum.DATA_ANALYSE, priority=PriorityEnum.SPEED, ), ) response = await aiService.callAi(request) except Exception as aiErr: logger.warning( f"Session {sessionId}: Voice summary AI call failed: {aiErr}" ) return sanitised[: service._VOICE_DIRECT_MAX_CHARS] if not response or response.errorCount != 0 or not response.content: logger.warning( f"Session {sessionId}: Voice summary returned empty/error" ) return sanitised[: service._VOICE_DIRECT_MAX_CHARS] spoken = response.content.strip() spoken = _voiceFriendlyMeetingText(spoken) if not spoken: return sanitised[: service._VOICE_DIRECT_MAX_CHARS] logger.info( f"Session {sessionId}: Voice summary generated " f"(orig={len(rawAnswer)} chars, sanitised={len(sanitised)}, " f"spoken={len(spoken)})" ) return spoken async def _pickQuickAckText(service) -> Optional[str]: """Return a short ack text in the bot's configured language.""" return await _pickEphemeralPhrase(service, "quickAck") async def _pickEphemeralPhrase( service, kind: str, substitutions: Optional[Dict[str, Any]] = None, ) -> Optional[str]: """Round-robin selector over the cached phrase pool for ``kind``.""" variants = await _getEphemeralPhrases(service, kind) if not variants: return None idx = service._phrasePoolIdx.get(kind, 0) % len(variants) service._phrasePoolIdx[kind] = (idx + 1) % len(variants) chosen = variants[idx] if substitutions: try: chosen = chosen.format(**substitutions) except (KeyError, IndexError, ValueError) as fmtErr: logger.debug( f"Ephemeral phrase substitution failed for kind={kind}: {fmtErr}" ) return chosen async def _getEphemeralPhrases(service, kind: str) -> List[str]: """Return the cached pool of AI-generated variants for ``kind``.""" cached = service._phrasePool.get(kind) if cached: return cached async with service._phrasePoolLock: cached = service._phrasePool.get(kind) if cached: return cached phrases = await _generateEphemeralPhrases(service, kind, 4) if phrases: service._phrasePool[kind] = phrases return phrases async def _generateEphemeralPhrases( service, kind: str, count: int ) -> List[str]: """Ask the AI to produce ``count`` short utterances for ``kind``.""" from .service import createAiService, _EPHEMERAL_PHRASE_INTENTS intent = _EPHEMERAL_PHRASE_INTENTS.get(kind) if not intent: logger.warning(f"Unknown ephemeral phrase kind requested: {kind}") return [] targetLang = (service.config.language or "").strip() or "en-US" botName = (service.config.botName or "the assistant").strip() persona = (service.config.aiSystemPrompt or "").strip() prompt = ( f"You are localizing short SPOKEN-LANGUAGE utterances for a " f"meeting assistant named '{botName}'.\n\n" f"Persona / style guide for the assistant:\n" f"{persona or '(no persona configured — use a neutral, polite, professional tone)'}\n\n" f"Target spoken language (BCP-47 code): {targetLang}\n\n" f"Utterance intent:\n{intent}\n\n" f"Generate {count} DIFFERENT variants matching this intent, in " f"the target language. Variants should feel natural when spoken " f"aloud, not robotic. Do NOT include the assistant's name in " f"the variants.\n\n" f"Output STRICTLY a JSON array of {count} plain-text strings, " f"with no markdown fences, no commentary, no surrounding " f"quotation marks beyond the JSON syntax itself. Example " f"format: [\"...\", \"...\", \"...\", \"...\"]" ) try: aiService = createAiService( service.currentUser, service.mandateId, service.instanceId ) await aiService.ensureAiObjectsInitialized() request = AiCallRequest( prompt=prompt, context="", options=AiCallOptions( operationType=OperationTypeEnum.DATA_ANALYSE, priority=PriorityEnum.SPEED, ), ) response = await aiService.callAi(request) except Exception as aiErr: logger.warning( f"Ephemeral phrase generation failed (kind={kind}, lang={targetLang}): {aiErr}" ) return [] if not response or response.errorCount != 0 or not response.content: logger.warning( f"Ephemeral phrase generation returned empty/error " f"(kind={kind}, lang={targetLang})" ) return [] raw = response.content.strip() raw = re.sub(r"^```(?:json)?\s*", "", raw) raw = re.sub(r"\s*```\s*$", "", raw) try: arr = json.loads(raw) except json.JSONDecodeError as parseErr: logger.warning( f"Ephemeral phrase generation: could not parse JSON " f"(kind={kind}, lang={targetLang}): {parseErr} " f"raw={raw[:200]}" ) return [] if not isinstance(arr, list): return [] cleaned = [ str(v).strip() for v in arr if isinstance(v, str) and str(v).strip() ] cleaned = cleaned[:count] if cleaned: logger.info( f"Ephemeral phrase pool generated (kind={kind}, " f"lang={targetLang}, count={len(cleaned)})" ) return cleaned async def _runQuickAck(service, sessionId: str) -> None: """Background task: speak a short ack into the meeting via TTS.""" from .service import _emitSessionEvent, _speakTextChunked websocket = service._websocket voiceInterface = service._voiceInterface if websocket is None or voiceInterface is None: return if not service._shouldFireQuickAck(): return ackText = await _pickQuickAckText(service) if not ackText: return service._lastQuickAckTs = time.time() try: await _emitSessionEvent(sessionId, "quickAck", { "text": ackText, "timestamp": getUtcTimestamp(), }) cancelHook = service._makeAnswerCancelHook() async with service._meetingTtsLock: outcome = await _speakTextChunked( websocket=websocket, voiceInterface=voiceInterface, sessionId=sessionId, voiceText=ackText, languageCode=service.config.language, voiceName=service.config.voiceId, isCancelled=cancelHook, ) if not outcome.get("success"): logger.info( f"Session {sessionId}: Quick ack TTS failed silently " f"({outcome.get('error')}) — main response will still go through" ) except asyncio.CancelledError: logger.info(f"Session {sessionId}: Quick ack cancelled by stop signal") except Exception as ackErr: logger.warning(f"Session {sessionId}: Quick ack failed: {ackErr}") finally: service._currentQuickAckTask = None async def _checkPendingNameTrigger(service, delaySec: float = 3.0): """Async loop: fire the pending name trigger once the speaker is quiet.""" await asyncio.sleep(delaySec) if not service._pendingNameTrigger: return now = time.time() lastActivity = service._pendingNameTrigger.get("lastActivity", 0) detectedAt = service._pendingNameTrigger.get("detectedAt", 0) quietSec = now - lastActivity totalWaitSec = now - detectedAt if quietSec >= 3.0 or totalWaitSec >= 15.0: trigger = service._pendingNameTrigger service._pendingNameTrigger = None logger.info( f"Session {trigger['sessionId']}: Debounced name trigger fires " f"(quiet={quietSec:.1f}s, totalWait={totalWaitSec:.1f}s)" ) await _analyzeAndRespond( service, trigger["sessionId"], trigger["interface"], trigger["voiceInterface"], trigger["websocket"], trigger["triggerTranscript"], ) else: remaining = max(0.5, 3.0 - quietSec) asyncio.create_task(_checkPendingNameTrigger(service, remaining)) async def _warmEphemeralPhrasePool(service, sessionId: str) -> None: """Fire-and-forget: generate ephemeral phrase pool for all kinds.""" from .service import _EPHEMERAL_PHRASE_INTENTS try: for kind in _EPHEMERAL_PHRASE_INTENTS: try: await _getEphemeralPhrases(service, kind) except Exception as innerErr: logger.warning( f"Session {sessionId}: Phrase pool warmup failed for " f"kind={kind}: {innerErr}" ) except Exception as warmErr: logger.warning( f"Session {sessionId}: Phrase pool warmup task crashed: {warmErr}" ) async def _runEscalationAndRelease( service, sessionId: str, taskBrief: str, briefingFileIds: List[str], triggerTranscriptId: Optional[str], ) -> None: """Background wrapper for ``_runAgentForMeeting`` that holds the ``_agentEscalationInFlight`` flag for the duration of the agent run.""" try: await service._runAgentForMeeting( sessionId=sessionId, taskText=taskBrief, fileIds=briefingFileIds, sourceLabel="speechEscalation", triggerTranscriptId=triggerTranscriptId, ) except asyncio.CancelledError: logger.info( f"Session {sessionId}: Escalation agent task cancelled by stop signal" ) except Exception as escErr: logger.error( f"Session {sessionId}: Escalation agent task failed: " f"{type(escErr).__name__}: {escErr}", exc_info=True, ) finally: service._agentEscalationInFlight = False service._currentEscalationTask = None