Merge pull request #93 from valueonag/feat/auxiliaries

Feat/auxiliaries
This commit is contained in:
Patrick Motsch 2026-02-13 08:55:16 +01:00 committed by GitHub
commit 722728720e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 2094 additions and 2 deletions

View file

@ -57,6 +57,9 @@ Connector_GoogleSpeech_API_KEY_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpETk5FWWM3Q0JKMzhI
# Feature SyncDelta JIRA configuration
Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = DEV_ENC:Z0FBQUFBQm8xSUpEbm0yRUJ6VUJKbUwyRW5kMnRaNW4wM2YxMkJUTXVXZUdmdVRCaUZIVHU2TTV2RWZLRmUtZkcwZE4yRUNlNDQ0aUJWYjNfdVg5YjV5c2JwMHhoUUYxZWdkeS11bXR0eGxRLWRVaVU3cUVQZWJlNDRtY1lWUDdqeDVFSlpXS0VFX21WajlRS3lHQjc0bS11akkybWV3QUFlR2hNWUNYLUdiRjZuN2dQODdDSExXWG1Dd2ZGclI2aUhlSWhETVZuY3hYdnhkb2c2LU1JTFBvWFpTNmZtMkNVOTZTejJwbDI2eGE0OS1xUlIwQnlCSmFxRFNCeVJNVzlOMDhTR1VUamx4RDRyV3p6Tk9qVHBrWWdySUM3TVRaYjd3N0JHMFhpdzFhZTNDLTFkRVQ2RVE4U19COXRhRWtNc0NVOHRqUS1CRDFpZ19xQmtFLU9YSDU3TXBZQXpVcld3PT0=
# Teamsbot Media Bridge
TEAMSBOT_BRIDGE_URL = https://media.poweron.swiss:9440
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = True
APP_DEBUG_CHAT_WORKFLOW_DIR = D:/Athi/Local/Web/poweron/local/debug

View file

@ -57,6 +57,9 @@ Connector_GoogleSpeech_API_KEY_SECRET = INT_ENC:Z0FBQUFBQm8xSVRkNmVXZ1pWcHcydTF2
# Feature SyncDelta JIRA configuration
Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = INT_ENC:Z0FBQUFBQm8xSVRkTUNsWm4wX0p6eXFDZmJ4dFdHNEs1MV9MUzdrb3RzeC1jVWVYZ0REWHRyZkFiaGZLcUQtTXFBZzZkNzRmQ0gxbEhGbUNlVVFfR1JEQTc0aldkZkgyWnBOcjdlUlZxR0tDTEdKRExULXAyUEtsVmNTMkRKU1BJNnFiM0hlMXo4YndMcHlRMExtZDQ3Zm9vNFhMcEZCcHpBPT0=
# Teamsbot Media Bridge
TEAMSBOT_BRIDGE_URL = https://media.poweron.swiss:9440
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE
APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat

View file

@ -57,6 +57,9 @@ Connector_GoogleSpeech_API_KEY_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z4NFQxaF9uN3h1cVB
# Feature SyncDelta JIRA configuration
Feature_SyncDelta_JIRA_DELTA_TOKEN_SECRET = PROD_ENC:Z0FBQUFBQnBDM1Z4d3Z4d2x6N1FhUktMU0RKbkxfY2pTQkRzXzJ6UXVEbDNCaFM3UHMtQVFGYzNmYWs4N0lMM1R2SFJuZTVFVmx6MGVEbXc5U3NOTnY1TWN0ZDNaamlHQWloalM3VldmREJNSHQ1TlVkSVFJMTVhQWVGSVRMTGw4UTBqNGlQZFVuaHp4WUlKemR5UnBXZlh0REJFLXJ4ejR3PT0=
# Teamsbot Media Bridge
TEAMSBOT_BRIDGE_URL = https://media.poweron.swiss:9440
# Debug Configuration
APP_DEBUG_CHAT_WORKFLOW_ENABLED = FALSE
APP_DEBUG_CHAT_WORKFLOW_DIR = ./test-chat

View file

@ -26,6 +26,9 @@ class OperationTypeEnum(str, Enum):
WEB_SEARCH_DATA = "webSearch" # Returns list of URLs only
WEB_CRAWL = "webCrawl" # Web crawl for a given URL
# Speech Operations (dedicated pipeline, bypasses standard model selection)
SPEECH_TEAMS = "speechTeams" # Teams Meeting AI analysis: decide if/how to respond
# Operation Type Rating - Helper class for capability ratings
class OperationTypeRating(BaseModel):

View file

@ -0,0 +1,2 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.

View file

@ -0,0 +1,121 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Bridge Connector - Communication with the .NET Media Bridge.
Handles HTTP and WebSocket communication for meeting join/leave and audio streaming.
"""
import logging
import aiohttp
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
# Default timeout for bridge HTTP calls
_BRIDGE_TIMEOUT = aiohttp.ClientTimeout(total=30)
class BridgeConnector:
"""Connector to the .NET Media Bridge service."""
def __init__(self, bridgeUrl: Optional[str] = None):
self.bridgeUrl = (bridgeUrl or "").rstrip("/")
def _isConfigured(self) -> bool:
"""Check if the bridge URL is configured."""
return bool(self.bridgeUrl)
async def joinMeeting(
self,
meetingLink: str,
botName: str,
backgroundImageUrl: Optional[str],
sessionId: str,
gatewayCallbackUrl: str,
gatewayWsUrl: str,
gatewayBaseUrl: str = "",
) -> Dict[str, Any]:
"""
Send join command to the .NET Media Bridge.
Args:
gatewayBaseUrl: Base URL of this gateway instance so the bridge can
build full callback/WebSocket URLs per-session.
Returns:
Dict with 'success' bool and 'bridgeSessionId' or 'error' string.
"""
if not self._isConfigured():
logger.warning("Bridge URL not configured. Simulating join for development.")
return {
"success": True,
"bridgeSessionId": f"dev-{sessionId[:8]}",
"message": "Development mode: Bridge not connected"
}
payload = {
"meetingLink": meetingLink,
"botName": botName,
"backgroundImageUrl": backgroundImageUrl,
"sessionId": sessionId,
"gatewayCallbackUrl": gatewayCallbackUrl,
"gatewayWsUrl": gatewayWsUrl,
"gatewayBaseUrl": gatewayBaseUrl,
}
try:
async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session:
async with session.post(f"{self.bridgeUrl}/bridge/join", json=payload) as resp:
if resp.status == 200:
data = await resp.json()
return {
"success": True,
"bridgeSessionId": data.get("bridgeSessionId"),
}
else:
errorText = await resp.text()
logger.error(f"Bridge join failed: {resp.status} - {errorText}")
return {"success": False, "error": f"Bridge returned {resp.status}: {errorText}"}
except aiohttp.ClientError as e:
logger.error(f"Bridge connection error: {e}")
return {"success": False, "error": f"Bridge connection failed: {str(e)}"}
except Exception as e:
logger.error(f"Bridge join error: {e}")
return {"success": False, "error": str(e)}
async def leaveMeeting(self, sessionId: str) -> Dict[str, Any]:
"""Send leave command to the .NET Media Bridge."""
if not self._isConfigured():
logger.warning("Bridge URL not configured. Simulating leave for development.")
return {"success": True, "message": "Development mode: Bridge not connected"}
try:
async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session:
async with session.post(f"{self.bridgeUrl}/bridge/leave/{sessionId}") as resp:
if resp.status == 200:
return {"success": True}
else:
errorText = await resp.text()
logger.error(f"Bridge leave failed: {resp.status} - {errorText}")
return {"success": False, "error": f"Bridge returned {resp.status}: {errorText}"}
except Exception as e:
logger.error(f"Bridge leave error: {e}")
return {"success": False, "error": str(e)}
async def getStatus(self) -> Dict[str, Any]:
"""Get bridge health and active sessions."""
if not self._isConfigured():
return {"healthy": False, "message": "Bridge URL not configured"}
try:
async with aiohttp.ClientSession(timeout=_BRIDGE_TIMEOUT) as session:
async with session.get(f"{self.bridgeUrl}/bridge/status") as resp:
if resp.status == 200:
return await resp.json()
else:
return {"healthy": False, "error": f"Bridge returned {resp.status}"}
except Exception as e:
return {"healthy": False, "error": str(e)}

View file

@ -0,0 +1,61 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot Feature - Configuration utilities.
Loads and manages bot configuration from FeatureInstance.config.
"""
import logging
from typing import Optional, Dict, Any
from .datamodelTeamsbot import TeamsbotConfig
logger = logging.getLogger(__name__)
def loadConfigFromInstance(instanceId: str) -> TeamsbotConfig:
"""
Load TeamsbotConfig from a FeatureInstance's config JSONB field.
Falls back to default config if not found or invalid.
"""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
logger.warning(f"FeatureInstance '{instanceId}' not found, using default config")
return TeamsbotConfig()
configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None)
if configDict and isinstance(configDict, dict):
# Extract teamsbot-specific config if nested
teamsbotConfig = configDict.get("teamsbot", configDict)
return TeamsbotConfig(**teamsbotConfig)
except Exception as e:
logger.error(f"Error loading teamsbot config for instance '{instanceId}': {e}")
return TeamsbotConfig()
def saveConfigToInstance(instanceId: str, config: TeamsbotConfig) -> bool:
"""Save TeamsbotConfig to FeatureInstance.config JSONB field."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
featureInterface.updateFeatureInstanceConfig(instanceId, config.model_dump())
logger.info(f"Teamsbot config saved for instance '{instanceId}'")
return True
except Exception as e:
logger.error(f"Error saving teamsbot config for instance '{instanceId}': {e}")
return False

View file

@ -0,0 +1,196 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot Feature - Data Models.
Pydantic models for Teams Bot sessions, transcripts, bot responses, and configuration.
"""
from typing import Optional, List, Dict, Any
from pydantic import BaseModel, Field
from enum import Enum
import uuid
# ============================================================================
# Enums
# ============================================================================
class TeamsbotSessionStatus(str, Enum):
"""Status of a Teams Bot session."""
PENDING = "pending" # Session created, waiting for bridge
JOINING = "joining" # Bridge is joining the meeting
ACTIVE = "active" # Bot is in the meeting and processing audio
LEAVING = "leaving" # Bot is leaving the meeting
ENDED = "ended" # Session completed normally
ERROR = "error" # Session ended with an error
class TeamsbotResponseType(str, Enum):
"""Type of bot response delivery."""
AUDIO = "audio" # Voice response only
CHAT = "chat" # Chat message only
BOTH = "both" # Voice + chat message
class TeamsbotDetectedIntent(str, Enum):
"""Intent detected by the SPEECH_TEAMS AI handler."""
ADDRESSED = "addressed" # Bot was directly addressed
QUESTION = "question" # A general question was asked
PROACTIVE = "proactive" # Bot has a valuable proactive contribution
NONE = "none" # No action needed
class TeamsbotResponseMode(str, Enum):
"""How the bot should respond."""
AUTO = "auto" # Fully automatic: AI decides when to respond
MANUAL = "manual" # User triggers responses manually from UI
TRANSCRIBE_ONLY = "transcribeOnly" # Only transcribe, no AI responses
# ============================================================================
# Database Models (stored in PostgreSQL)
# ============================================================================
class TeamsbotSession(BaseModel):
"""A Teams Bot meeting session."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Session ID")
instanceId: str = Field(description="Feature instance ID (FK)")
mandateId: str = Field(description="Mandate ID (FK)")
meetingLink: str = Field(description="Teams meeting join link")
botName: str = Field(default="AI Assistant", description="Display name of the bot in the meeting")
backgroundImageUrl: Optional[str] = Field(default=None, description="Background image URL for the bot's video feed")
status: TeamsbotSessionStatus = Field(default=TeamsbotSessionStatus.PENDING, description="Current session status")
startedAt: Optional[str] = Field(default=None, description="ISO timestamp when session started")
endedAt: Optional[str] = Field(default=None, description="ISO timestamp when session ended")
startedByUserId: str = Field(description="User ID who started the session")
bridgeSessionId: Optional[str] = Field(default=None, description="Session ID on the .NET Media Bridge")
meetingChatId: Optional[str] = Field(default=None, description="Teams meeting chat ID for Graph API messages")
summary: Optional[str] = Field(default=None, description="AI-generated meeting summary (after session ends)")
errorMessage: Optional[str] = Field(default=None, description="Error message if status is ERROR")
transcriptSegmentCount: int = Field(default=0, description="Number of transcript segments in this session")
botResponseCount: int = Field(default=0, description="Number of bot responses in this session")
creationDate: Optional[str] = Field(default=None, description="ISO timestamp of record creation")
lastModified: Optional[str] = Field(default=None, description="ISO timestamp of last modification")
class TeamsbotTranscript(BaseModel):
"""A single transcript segment from the meeting."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Transcript segment ID")
sessionId: str = Field(description="Session ID (FK)")
speaker: Optional[str] = Field(default=None, description="Speaker name or identifier")
text: str = Field(description="Transcribed text")
timestamp: str = Field(description="ISO timestamp of the speech segment")
confidence: float = Field(default=0.0, ge=0.0, le=1.0, description="STT confidence score")
language: Optional[str] = Field(default=None, description="Detected language code (e.g., de-DE)")
isFinal: bool = Field(default=True, description="Whether this is a final or interim result")
creationDate: Optional[str] = Field(default=None, description="ISO timestamp of record creation")
class TeamsbotBotResponse(BaseModel):
"""A bot response generated during a meeting session."""
id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Response ID")
sessionId: str = Field(description="Session ID (FK)")
responseText: str = Field(description="The bot's response text")
responseType: TeamsbotResponseType = Field(default=TeamsbotResponseType.BOTH, description="How the response was delivered")
detectedIntent: TeamsbotDetectedIntent = Field(default=TeamsbotDetectedIntent.NONE, description="What triggered the response")
reasoning: Optional[str] = Field(default=None, description="AI reasoning for why it responded")
triggeredByTranscriptId: Optional[str] = Field(default=None, description="Transcript segment that triggered this response")
modelName: Optional[str] = Field(default=None, description="AI model used for this response")
processingTime: float = Field(default=0.0, description="Processing time in seconds")
priceCHF: float = Field(default=0.0, description="Cost of this AI call in CHF")
timestamp: Optional[str] = Field(default=None, description="ISO timestamp of the response")
creationDate: Optional[str] = Field(default=None, description="ISO timestamp of record creation")
# ============================================================================
# Configuration Model (stored in FeatureInstance.config JSONB)
# ============================================================================
class TeamsbotConfig(BaseModel):
"""Configuration for a Teams Bot feature instance."""
botName: str = Field(default="AI Assistant", description="Default bot display name")
backgroundImageUrl: Optional[str] = Field(default=None, description="Default background image URL")
aiSystemPrompt: str = Field(
default="Du bist ein hilfreicher Meeting-Assistent. Fasse wichtige Punkte zusammen und beantworte Fragen sachlich.",
description="Custom system prompt for the AI analysis"
)
responseMode: TeamsbotResponseMode = Field(default=TeamsbotResponseMode.AUTO, description="How the bot responds")
language: str = Field(default="de-DE", description="Primary language for STT/TTS")
voiceId: Optional[str] = Field(default=None, description="Google TTS voice ID (e.g., de-DE-Standard-A)")
bridgeUrl: Optional[str] = Field(default=None, description="URL of the .NET Media Bridge service. Falls back to TEAMSBOT_BRIDGE_URL env variable if not set per-instance.")
triggerIntervalSeconds: int = Field(default=10, ge=3, le=60, description="Seconds between periodic AI analysis triggers")
triggerCooldownSeconds: int = Field(default=3, ge=1, le=30, description="Minimum seconds between AI calls")
contextWindowSegments: int = Field(default=20, ge=5, le=100, description="Number of transcript segments to include in AI context")
def _getEffectiveBridgeUrl(self) -> Optional[str]:
"""Resolve the effective bridge URL: per-instance config takes priority, then env variable."""
if self.bridgeUrl:
return self.bridgeUrl
from modules.shared.configuration import APP_CONFIG
return APP_CONFIG.get("TEAMSBOT_BRIDGE_URL")
# ============================================================================
# API Request/Response Models
# ============================================================================
class TeamsbotStartSessionRequest(BaseModel):
"""Request to start a new Teams Bot session."""
meetingLink: str = Field(description="Teams meeting join link (e.g., https://teams.microsoft.com/l/meetup-join/...)")
botName: Optional[str] = Field(default=None, description="Override bot name for this session")
backgroundImageUrl: Optional[str] = Field(default=None, description="Override background image for this session")
connectionId: Optional[str] = Field(default=None, description="Microsoft connection ID for Graph API access")
class TeamsbotSessionResponse(BaseModel):
"""Response for session details."""
session: TeamsbotSession
transcripts: Optional[List[TeamsbotTranscript]] = Field(default=None, description="Transcript segments (if requested)")
botResponses: Optional[List[TeamsbotBotResponse]] = Field(default=None, description="Bot responses (if requested)")
class TeamsbotConfigUpdateRequest(BaseModel):
"""Request to update teamsbot configuration."""
botName: Optional[str] = None
backgroundImageUrl: Optional[str] = None
aiSystemPrompt: Optional[str] = None
responseMode: Optional[TeamsbotResponseMode] = None
language: Optional[str] = None
voiceId: Optional[str] = None
bridgeUrl: Optional[str] = None
triggerIntervalSeconds: Optional[int] = None
triggerCooldownSeconds: Optional[int] = None
contextWindowSegments: Optional[int] = None
# ============================================================================
# SPEECH_TEAMS AI Response Model
# ============================================================================
class SpeechTeamsResponse(BaseModel):
"""Structured response from the SPEECH_TEAMS AI handler."""
shouldRespond: bool = Field(description="Whether the bot should respond")
responseText: Optional[str] = Field(default=None, description="The bot's response text (only if shouldRespond=True)")
reasoning: str = Field(default="", description="Reasoning for the decision (for logging/debug)")
detectedIntent: str = Field(default="none", description="Detected intent: addressed, question, proactive, none")
# ============================================================================
# Bridge Communication Models
# ============================================================================
class BridgeJoinRequest(BaseModel):
"""Request sent to .NET Media Bridge to join a meeting."""
meetingLink: str = Field(description="Teams meeting join link")
botName: str = Field(description="Bot display name")
backgroundImageUrl: Optional[str] = Field(default=None, description="Background image URL")
gatewayCallbackUrl: str = Field(description="Gateway URL for bridge callbacks")
gatewayWsUrl: str = Field(description="Gateway WebSocket URL for audio streaming")
sessionId: str = Field(description="Session ID for correlation")
gatewayBaseUrl: str = Field(description="Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net)")
class BridgeStatusResponse(BaseModel):
"""Status response from the .NET Media Bridge."""
healthy: bool = Field(description="Whether the bridge is healthy")
activeSessions: int = Field(default=0, description="Number of active meeting sessions")
sessions: Optional[List[Dict[str, Any]]] = Field(default=None, description="Details of active sessions")

View file

@ -0,0 +1,203 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Interface to Teamsbot database.
Uses the PostgreSQL connector for data access with user/mandate filtering.
"""
import logging
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelUam import User
from modules.connectors.connectorDbPostgre import DatabaseConnector
from modules.shared.timeUtils import getUtcTimestamp
from modules.shared.configuration import APP_CONFIG
from .datamodelTeamsbot import (
TeamsbotSession,
TeamsbotSessionStatus,
TeamsbotTranscript,
TeamsbotBotResponse,
)
logger = logging.getLogger(__name__)
# Singleton factory
_interfaces = {}
def getInterface(currentUser: User, mandateId: str = None, featureInstanceId: str = None):
"""Factory: get or create a TeamsbotObjects interface instance."""
key = f"{currentUser.id}_{mandateId}_{featureInstanceId}"
if key not in _interfaces:
_interfaces[key] = TeamsbotObjects(currentUser, mandateId, featureInstanceId)
else:
_interfaces[key].currentUser = currentUser
_interfaces[key].mandateId = mandateId
_interfaces[key].featureInstanceId = featureInstanceId
return _interfaces[key]
class TeamsbotObjects:
"""Database interface for Teams Bot feature."""
def __init__(self, currentUser: User, mandateId: str = None, featureInstanceId: str = None):
self.currentUser = currentUser
self.mandateId = mandateId
self.featureInstanceId = featureInstanceId
self.userId = str(currentUser.id) if currentUser else "system"
dbHost = APP_CONFIG.get("DB_HOST", "_no_config_default_data")
dbDatabase = "poweron_teamsbot"
dbUser = APP_CONFIG.get("DB_USER")
dbPassword = APP_CONFIG.get("DB_PASSWORD_SECRET")
dbPort = int(APP_CONFIG.get("DB_PORT", 5432))
self.db = DatabaseConnector(
dbHost=dbHost,
dbDatabase=dbDatabase,
dbUser=dbUser,
dbPassword=dbPassword,
dbPort=dbPort,
userId=self.userId,
)
# =========================================================================
# Sessions
# =========================================================================
def getSessions(self, instanceId: str, includeEnded: bool = True) -> List[Dict[str, Any]]:
"""Get all sessions for a feature instance."""
records = self.db.getRecordset(
TeamsbotSession,
recordFilter={"instanceId": instanceId},
)
if not includeEnded:
records = [r for r in records if r.get("status") != TeamsbotSessionStatus.ENDED.value]
# Sort by startedAt descending
records.sort(key=lambda r: r.get("startedAt") or "", reverse=True)
return records
def getActiveSessions(self, instanceId: str) -> List[Dict[str, Any]]:
"""Get only active (non-ended, non-error) sessions."""
records = self.db.getRecordset(
TeamsbotSession,
recordFilter={"instanceId": instanceId},
)
activeStatuses = {
TeamsbotSessionStatus.PENDING.value,
TeamsbotSessionStatus.JOINING.value,
TeamsbotSessionStatus.ACTIVE.value,
}
return [r for r in records if r.get("status") in activeStatuses]
def getSession(self, sessionId: str) -> Optional[Dict[str, Any]]:
"""Get a single session by ID."""
records = self.db.getRecordset(TeamsbotSession, recordFilter={"id": sessionId})
return records[0] if records else None
def createSession(self, sessionData: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new session."""
sessionData["creationDate"] = getUtcTimestamp()
sessionData["lastModified"] = getUtcTimestamp()
return self.db.recordCreate(TeamsbotSession, sessionData)
def updateSession(self, sessionId: str, updates: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Update session fields."""
updates["lastModified"] = getUtcTimestamp()
return self.db.recordModify(TeamsbotSession, sessionId, updates)
def deleteSession(self, sessionId: str) -> bool:
"""Delete a session and all related transcripts and responses."""
# Delete related records first
self._deleteTranscriptsBySession(sessionId)
self._deleteResponsesBySession(sessionId)
# Delete session
return self.db.recordDelete(TeamsbotSession, sessionId)
# =========================================================================
# Transcripts
# =========================================================================
def getTranscripts(self, sessionId: str, limit: int = None, offset: int = None) -> List[Dict[str, Any]]:
"""Get transcript segments for a session, ordered by timestamp."""
records = self.db.getRecordset(
TeamsbotTranscript,
recordFilter={"sessionId": sessionId},
)
records.sort(key=lambda r: r.get("timestamp") or "")
if offset:
records = records[offset:]
if limit:
records = records[:limit]
return records
def getRecentTranscripts(self, sessionId: str, count: int = 20) -> List[Dict[str, Any]]:
"""Get the most recent N transcript segments for context building."""
records = self.db.getRecordset(
TeamsbotTranscript,
recordFilter={"sessionId": sessionId},
)
records.sort(key=lambda r: r.get("timestamp") or "")
return records[-count:]
def createTranscript(self, transcriptData: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new transcript segment."""
transcriptData["creationDate"] = getUtcTimestamp()
return self.db.recordCreate(TeamsbotTranscript, transcriptData)
def _deleteTranscriptsBySession(self, sessionId: str) -> int:
"""Delete all transcripts for a session."""
records = self.db.getRecordset(TeamsbotTranscript, recordFilter={"sessionId": sessionId})
count = 0
for record in records:
self.db.recordDelete(TeamsbotTranscript, record.get("id"))
count += 1
return count
# =========================================================================
# Bot Responses
# =========================================================================
def getBotResponses(self, sessionId: str) -> List[Dict[str, Any]]:
"""Get all bot responses for a session."""
records = self.db.getRecordset(
TeamsbotBotResponse,
recordFilter={"sessionId": sessionId},
)
records.sort(key=lambda r: r.get("timestamp") or "")
return records
def createBotResponse(self, responseData: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new bot response record."""
responseData["creationDate"] = getUtcTimestamp()
return self.db.recordCreate(TeamsbotBotResponse, responseData)
def _deleteResponsesBySession(self, sessionId: str) -> int:
"""Delete all bot responses for a session."""
records = self.db.getRecordset(TeamsbotBotResponse, recordFilter={"sessionId": sessionId})
count = 0
for record in records:
self.db.recordDelete(TeamsbotBotResponse, record.get("id"))
count += 1
return count
# =========================================================================
# Stats / Aggregation
# =========================================================================
def getSessionStats(self, sessionId: str) -> Dict[str, Any]:
"""Get aggregated statistics for a session."""
transcripts = self.db.getRecordset(TeamsbotTranscript, recordFilter={"sessionId": sessionId})
responses = self.db.getRecordset(TeamsbotBotResponse, recordFilter={"sessionId": sessionId})
totalCost = sum(r.get("priceCHF", 0) for r in responses)
totalProcessingTime = sum(r.get("processingTime", 0) for r in responses)
return {
"transcriptSegments": len(transcripts),
"botResponses": len(responses),
"totalCostCHF": round(totalCost, 4),
"totalProcessingTime": round(totalProcessingTime, 2),
"speakers": list(set(t.get("speaker") for t in transcripts if t.get("speaker"))),
}

View file

@ -0,0 +1,282 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot Feature Container - Main Module.
Handles feature initialization and RBAC catalog registration.
"""
import logging
from typing import Dict, List, Any
logger = logging.getLogger(__name__)
# Feature metadata
FEATURE_CODE = "teamsbot"
FEATURE_LABEL = {"en": "Teams Bot", "de": "Teams Bot", "fr": "Teams Bot"}
FEATURE_ICON = "mdi-headset"
# UI Objects for RBAC catalog
UI_OBJECTS = [
{
"objectKey": "ui.feature.teamsbot.dashboard",
"label": {"en": "Dashboard", "de": "Dashboard", "fr": "Tableau de bord"},
"meta": {"area": "dashboard"}
},
{
"objectKey": "ui.feature.teamsbot.sessions",
"label": {"en": "Sessions", "de": "Sitzungen", "fr": "Sessions"},
"meta": {"area": "sessions"}
},
{
"objectKey": "ui.feature.teamsbot.settings",
"label": {"en": "Settings", "de": "Einstellungen", "fr": "Paramètres"},
"meta": {"area": "settings", "admin_only": True}
},
]
# DATA Objects for RBAC catalog (tables/entities)
DATA_OBJECTS = [
{
"objectKey": "data.feature.teamsbot.TeamsbotSession",
"label": {"en": "Session", "de": "Sitzung", "fr": "Session"},
"meta": {"table": "TeamsbotSession", "fields": ["id", "meetingLink", "botName", "status", "startedAt", "endedAt"]}
},
{
"objectKey": "data.feature.teamsbot.TeamsbotTranscript",
"label": {"en": "Transcript", "de": "Transkript", "fr": "Transcription"},
"meta": {"table": "TeamsbotTranscript", "fields": ["id", "sessionId", "speaker", "text", "timestamp"]}
},
{
"objectKey": "data.feature.teamsbot.TeamsbotBotResponse",
"label": {"en": "Bot Response", "de": "Bot-Antwort", "fr": "Réponse du bot"},
"meta": {"table": "TeamsbotBotResponse", "fields": ["id", "sessionId", "responseText", "detectedIntent"]}
},
{
"objectKey": "data.feature.teamsbot.*",
"label": {"en": "All Teams Bot Data", "de": "Alle Teams Bot Daten", "fr": "Toutes les données Teams Bot"},
"meta": {"wildcard": True, "description": "Wildcard for all teamsbot data tables"}
},
]
# Resource Objects for RBAC catalog
RESOURCE_OBJECTS = [
{
"objectKey": "resource.feature.teamsbot.session.start",
"label": {"en": "Start Session", "de": "Sitzung starten", "fr": "Démarrer session"},
"meta": {"endpoint": "/api/teamsbot/{instanceId}/sessions", "method": "POST"}
},
{
"objectKey": "resource.feature.teamsbot.session.stop",
"label": {"en": "Stop Session", "de": "Sitzung beenden", "fr": "Arrêter session"},
"meta": {"endpoint": "/api/teamsbot/{instanceId}/sessions/{sessionId}/stop", "method": "POST"}
},
{
"objectKey": "resource.feature.teamsbot.session.delete",
"label": {"en": "Delete Session", "de": "Sitzung löschen", "fr": "Supprimer session"},
"meta": {"endpoint": "/api/teamsbot/{instanceId}/sessions/{sessionId}", "method": "DELETE"}
},
{
"objectKey": "resource.feature.teamsbot.config.edit",
"label": {"en": "Edit Configuration", "de": "Konfiguration bearbeiten", "fr": "Modifier configuration"},
"meta": {"endpoint": "/api/teamsbot/{instanceId}/config", "method": "PUT", "admin_only": True}
},
]
# Template roles for this feature with AccessRules
TEMPLATE_ROLES = [
{
"roleLabel": "teamsbot-admin",
"description": {
"en": "Teams Bot Administrator - Full access to all sessions and settings",
"de": "Teams Bot Administrator - Vollzugriff auf alle Sitzungen und Einstellungen",
"fr": "Administrateur Teams Bot - Accès complet aux sessions et paramètres"
},
"accessRules": [
# Full UI access (all views including settings)
{"context": "UI", "item": None, "view": True},
# Full DATA access
{"context": "DATA", "item": None, "view": True, "read": "a", "create": "a", "update": "a", "delete": "a"},
# All resources
{"context": "RESOURCE", "item": "resource.feature.teamsbot.session.start", "view": True},
{"context": "RESOURCE", "item": "resource.feature.teamsbot.session.stop", "view": True},
{"context": "RESOURCE", "item": "resource.feature.teamsbot.session.delete", "view": True},
{"context": "RESOURCE", "item": "resource.feature.teamsbot.config.edit", "view": True},
]
},
{
"roleLabel": "teamsbot-user",
"description": {
"en": "Teams Bot User - Can start/stop sessions and view transcripts",
"de": "Teams Bot Benutzer - Kann Sitzungen starten/stoppen und Transkripte einsehen",
"fr": "Utilisateur Teams Bot - Peut démarrer/arrêter des sessions et voir les transcriptions"
},
"accessRules": [
# UI access to dashboard and sessions (not settings)
{"context": "UI", "item": "ui.feature.teamsbot.dashboard", "view": True},
{"context": "UI", "item": "ui.feature.teamsbot.sessions", "view": True},
# Own records only
{"context": "DATA", "item": "data.feature.teamsbot.TeamsbotSession", "view": True, "read": "m", "create": "m", "update": "m", "delete": "n"},
{"context": "DATA", "item": "data.feature.teamsbot.TeamsbotTranscript", "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
{"context": "DATA", "item": "data.feature.teamsbot.TeamsbotBotResponse", "view": True, "read": "m", "create": "n", "update": "n", "delete": "n"},
# Start and stop sessions
{"context": "RESOURCE", "item": "resource.feature.teamsbot.session.start", "view": True},
{"context": "RESOURCE", "item": "resource.feature.teamsbot.session.stop", "view": True},
]
},
]
def getFeatureDefinition() -> Dict[str, Any]:
"""Return the feature definition for registration."""
return {
"code": FEATURE_CODE,
"label": FEATURE_LABEL,
"icon": FEATURE_ICON
}
def getUiObjects() -> List[Dict[str, Any]]:
"""Return UI objects for RBAC catalog registration."""
return UI_OBJECTS
def getResourceObjects() -> List[Dict[str, Any]]:
"""Return resource objects for RBAC catalog registration."""
return RESOURCE_OBJECTS
def getTemplateRoles() -> List[Dict[str, Any]]:
"""Return template roles for this feature."""
return TEMPLATE_ROLES
def getDataObjects() -> List[Dict[str, Any]]:
"""Return DATA objects for RBAC catalog registration."""
return DATA_OBJECTS
def registerFeature(catalogService) -> bool:
"""Register this feature's RBAC objects in the catalog."""
try:
for uiObj in UI_OBJECTS:
catalogService.registerUiObject(
featureCode=FEATURE_CODE,
objectKey=uiObj["objectKey"],
label=uiObj["label"],
meta=uiObj.get("meta")
)
for resObj in RESOURCE_OBJECTS:
catalogService.registerResourceObject(
featureCode=FEATURE_CODE,
objectKey=resObj["objectKey"],
label=resObj["label"],
meta=resObj.get("meta")
)
for dataObj in DATA_OBJECTS:
catalogService.registerDataObject(
featureCode=FEATURE_CODE,
objectKey=dataObj["objectKey"],
label=dataObj["label"],
meta=dataObj.get("meta")
)
_syncTemplateRolesToDb()
logger.info(f"Feature '{FEATURE_CODE}' registered {len(UI_OBJECTS)} UI, {len(RESOURCE_OBJECTS)} resource, {len(DATA_OBJECTS)} data objects")
return True
except Exception as e:
logger.error(f"Failed to register feature '{FEATURE_CODE}': {e}")
return False
def _syncTemplateRolesToDb() -> int:
"""Sync template roles and their AccessRules to the database."""
try:
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.datamodels.datamodelRbac import Role, AccessRule, AccessRuleContext
rootInterface = getRootInterface()
existingRoles = rootInterface.getRolesByFeatureCode(FEATURE_CODE)
templateRoles = [r for r in existingRoles if r.mandateId is None]
existingRoleLabels = {r.roleLabel: str(r.id) for r in templateRoles}
createdCount = 0
for roleTemplate in TEMPLATE_ROLES:
roleLabel = roleTemplate["roleLabel"]
if roleLabel in existingRoleLabels:
roleId = existingRoleLabels[roleLabel]
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
else:
newRole = Role(
roleLabel=roleLabel,
description=roleTemplate.get("description", {}),
featureCode=FEATURE_CODE,
mandateId=None,
featureInstanceId=None,
isSystemRole=False
)
createdRole = rootInterface.db.recordCreate(Role, newRole.model_dump())
roleId = createdRole.get("id")
_ensureAccessRulesForRole(rootInterface, roleId, roleTemplate.get("accessRules", []))
logger.info(f"Created template role '{roleLabel}' with ID {roleId}")
createdCount += 1
if createdCount > 0:
logger.info(f"Feature '{FEATURE_CODE}': Created {createdCount} template roles")
return createdCount
except Exception as e:
logger.error(f"Error syncing template roles for feature '{FEATURE_CODE}': {e}")
return 0
def _ensureAccessRulesForRole(rootInterface, roleId: str, ruleTemplates: List[Dict[str, Any]]) -> int:
"""Ensure AccessRules exist for a role based on templates."""
from modules.datamodels.datamodelRbac import AccessRule, AccessRuleContext
existingRules = rootInterface.getAccessRulesByRole(roleId)
existingSignatures = set()
for rule in existingRules:
sig = (rule.context.value if rule.context else None, rule.item)
existingSignatures.add(sig)
createdCount = 0
for template in ruleTemplates:
context = template.get("context", "UI")
item = template.get("item")
sig = (context, item)
if sig in existingSignatures:
continue
if context == "UI":
contextEnum = AccessRuleContext.UI
elif context == "DATA":
contextEnum = AccessRuleContext.DATA
elif context == "RESOURCE":
contextEnum = AccessRuleContext.RESOURCE
else:
contextEnum = context
newRule = AccessRule(
roleId=roleId,
context=contextEnum,
item=item,
view=template.get("view", False),
read=template.get("read"),
create=template.get("create"),
update=template.get("update"),
delete=template.get("delete"),
)
rootInterface.db.recordCreate(AccessRule, newRule.model_dump())
createdCount += 1
if createdCount > 0:
logger.debug(f"Created {createdCount} AccessRules for role {roleId}")
return createdCount

View file

@ -0,0 +1,418 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot routes for the backend API.
Implements Teams Bot session management, live streaming, and configuration endpoints.
"""
import logging
import json
import asyncio
from typing import Optional
from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, Request
from fastapi.responses import StreamingResponse
# Import auth modules
from modules.auth import limiter, getRequestContext, RequestContext
# Import interfaces
from . import interfaceFeatureTeamsbot as interfaceDb
from modules.interfaces.interfaceDbApp import getRootInterface
from modules.interfaces.interfaceFeatures import getFeatureInterface
# Import models
from .datamodelTeamsbot import (
TeamsbotSession,
TeamsbotSessionStatus,
TeamsbotStartSessionRequest,
TeamsbotSessionResponse,
TeamsbotConfigUpdateRequest,
TeamsbotConfig,
)
# Import service
from .service import TeamsbotService
logger = logging.getLogger(__name__)
# Create router
router = APIRouter(
prefix="/api/teamsbot",
tags=["Teamsbot"],
responses={404: {"description": "Not found"}}
)
# =========================================================================
# Helpers
# =========================================================================
def _getInterface(context: RequestContext, instanceId: Optional[str] = None):
"""Get teamsbot interface with instance context."""
mandateId = str(context.mandateId) if context.mandateId else None
return interfaceDb.getInterface(
context.user,
mandateId=mandateId,
featureInstanceId=instanceId
)
def _validateInstanceAccess(instanceId: str, context: RequestContext) -> str:
"""Validate user access to the feature instance. Returns mandateId."""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
raise HTTPException(status_code=404, detail=f"Feature instance '{instanceId}' not found")
mandateId = instance.get("mandateId") if isinstance(instance, dict) else getattr(instance, "mandateId", None)
if not mandateId:
raise HTTPException(status_code=500, detail="Feature instance has no mandateId")
return str(mandateId)
def _getInstanceConfig(instanceId: str) -> TeamsbotConfig:
"""Load TeamsbotConfig from FeatureInstance.config JSONB field."""
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
instance = featureInterface.getFeatureInstance(instanceId)
if not instance:
return TeamsbotConfig()
configDict = instance.get("config") if isinstance(instance, dict) else getattr(instance, "config", None)
if configDict and isinstance(configDict, dict):
try:
return TeamsbotConfig(**configDict)
except Exception:
pass
return TeamsbotConfig()
# =========================================================================
# Session Endpoints
# =========================================================================
@router.post("/{instanceId}/sessions")
@limiter.limit("10/minute")
async def startSession(
request: Request,
instanceId: str,
body: TeamsbotStartSessionRequest,
context: RequestContext = Depends(getRequestContext),
):
"""Start a new Teams Bot session -- bot joins the specified meeting."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
config = _getInstanceConfig(instanceId)
# Create session
sessionData = TeamsbotSession(
instanceId=instanceId,
mandateId=mandateId,
meetingLink=body.meetingLink,
botName=body.botName or config.botName,
backgroundImageUrl=body.backgroundImageUrl or config.backgroundImageUrl,
status=TeamsbotSessionStatus.PENDING,
startedByUserId=str(context.user.id),
).model_dump()
createdSession = interface.createSession(sessionData)
sessionId = createdSession.get("id")
# Derive gateway base URL from the incoming request so the bridge
# can build full callback/WS URLs targeting this specific gateway instance.
gatewayBaseUrl = str(request.base_url).rstrip("/")
# Start the bot in background (join meeting via bridge)
service = TeamsbotService(context.user, mandateId, instanceId, config)
asyncio.create_task(
service.joinMeeting(sessionId, body.meetingLink, body.connectionId, gatewayBaseUrl)
)
logger.info(f"Teamsbot session {sessionId} created for instance {instanceId}")
return {"session": createdSession}
@router.get("/{instanceId}/sessions")
@limiter.limit("30/minute")
async def listSessions(
request: Request,
instanceId: str,
includeEnded: bool = Query(True, description="Include ended sessions"),
context: RequestContext = Depends(getRequestContext),
):
"""List all sessions for a feature instance."""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
sessions = interface.getSessions(instanceId, includeEnded=includeEnded)
return {"sessions": sessions}
@router.get("/{instanceId}/sessions/{sessionId}")
@limiter.limit("30/minute")
async def getSession(
request: Request,
instanceId: str,
sessionId: str,
includeTranscripts: bool = Query(True),
includeResponses: bool = Query(True),
context: RequestContext = Depends(getRequestContext),
):
"""Get session details with optional transcripts and bot responses."""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
result = {"session": session}
if includeTranscripts:
result["transcripts"] = interface.getTranscripts(sessionId)
if includeResponses:
result["botResponses"] = interface.getBotResponses(sessionId)
result["stats"] = interface.getSessionStats(sessionId)
return result
@router.get("/{instanceId}/sessions/{sessionId}/stream")
@limiter.limit("10/minute")
async def streamSession(
request: Request,
instanceId: str,
sessionId: str,
context: RequestContext = Depends(getRequestContext),
):
"""
SSE live stream for a session.
Streams transcript segments and bot responses in real-time.
Events: transcript, botResponse, statusChange, error
"""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
async def _eventGenerator():
"""Generate SSE events from the session event queue."""
from .service import _sessionEvents
# Send initial session state
yield f"data: {json.dumps({'type': 'sessionState', 'data': session})}\n\n"
# Stream events
eventQueue = _sessionEvents.get(sessionId)
if not eventQueue:
_sessionEvents[sessionId] = asyncio.Queue()
eventQueue = _sessionEvents[sessionId]
try:
while True:
try:
event = await asyncio.wait_for(eventQueue.get(), timeout=30.0)
yield f"data: {json.dumps(event)}\n\n"
# Stop streaming if session ended
if event.get("type") == "statusChange" and event.get("data", {}).get("status") in ["ended", "error"]:
break
except asyncio.TimeoutError:
# Send keepalive ping
yield f"data: {json.dumps({'type': 'ping'})}\n\n"
except asyncio.CancelledError:
pass
return StreamingResponse(
_eventGenerator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@router.post("/{instanceId}/sessions/{sessionId}/stop")
@limiter.limit("10/minute")
async def stopSession(
request: Request,
instanceId: str,
sessionId: str,
context: RequestContext = Depends(getRequestContext),
):
"""Stop an active session -- bot leaves the meeting."""
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
config = _getInstanceConfig(instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
currentStatus = session.get("status")
if currentStatus in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]:
raise HTTPException(status_code=400, detail=f"Session already in terminal state: {currentStatus}")
# Stop the bot
service = TeamsbotService(context.user, mandateId, instanceId, config)
await service.leaveMeeting(sessionId)
logger.info(f"Teamsbot session {sessionId} stop requested")
return {"status": "stopping", "sessionId": sessionId}
@router.delete("/{instanceId}/sessions/{sessionId}")
@limiter.limit("10/minute")
async def deleteSession(
request: Request,
instanceId: str,
sessionId: str,
context: RequestContext = Depends(getRequestContext),
):
"""Delete a session and all related data."""
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
session = interface.getSession(sessionId)
if not session:
raise HTTPException(status_code=404, detail=f"Session '{sessionId}' not found")
# Don't delete active sessions
currentStatus = session.get("status")
if currentStatus in [TeamsbotSessionStatus.ACTIVE.value, TeamsbotSessionStatus.JOINING.value]:
raise HTTPException(status_code=400, detail="Cannot delete an active session. Stop it first.")
interface.deleteSession(sessionId)
logger.info(f"Teamsbot session {sessionId} deleted")
return {"deleted": True, "sessionId": sessionId}
# =========================================================================
# Configuration Endpoints
# =========================================================================
@router.get("/{instanceId}/config")
@limiter.limit("30/minute")
async def getConfig(
request: Request,
instanceId: str,
context: RequestContext = Depends(getRequestContext),
):
"""Get the teamsbot configuration for a feature instance."""
_validateInstanceAccess(instanceId, context)
config = _getInstanceConfig(instanceId)
return {"config": config.model_dump()}
@router.put("/{instanceId}/config")
@limiter.limit("10/minute")
async def updateConfig(
request: Request,
instanceId: str,
configUpdate: TeamsbotConfigUpdateRequest,
context: RequestContext = Depends(getRequestContext),
):
"""Update the teamsbot configuration."""
mandateId = _validateInstanceAccess(instanceId, context)
# Load current config and merge updates
currentConfig = _getInstanceConfig(instanceId)
updateDict = configUpdate.model_dump(exclude_none=True)
mergedConfig = currentConfig.model_copy(update=updateDict)
# Save to FeatureInstance.config
rootInterface = getRootInterface()
featureInterface = getFeatureInterface(rootInterface.db)
featureInterface.updateFeatureInstanceConfig(instanceId, mergedConfig.model_dump())
logger.info(f"Teamsbot config updated for instance {instanceId}: {list(updateDict.keys())}")
return {"config": mergedConfig.model_dump()}
# =========================================================================
# Bridge Communication Endpoints (called by .NET Media Bridge)
# =========================================================================
@router.post("/{instanceId}/bridge/status")
async def bridgeStatusCallback(
instanceId: str,
statusData: dict,
):
"""
Callback endpoint for the .NET Media Bridge to report status updates.
Called when: bot joins/leaves meeting, errors occur, etc.
Note: No user auth -- authenticated via bridge API key.
"""
sessionId = statusData.get("sessionId")
status = statusData.get("status")
errorMessage = statusData.get("errorMessage")
if not sessionId or not status:
raise HTTPException(status_code=400, detail="Missing sessionId or status")
# TODO: Validate bridge API key from headers
logger.info(f"Bridge status callback: session={sessionId}, status={status}")
# Update session status
from modules.datamodels.datamodelUam import User
systemUser = User(id="system", email="system@internal")
interface = interfaceDb.getInterface(systemUser, featureInstanceId=instanceId)
updates = {"status": status}
if errorMessage:
updates["errorMessage"] = errorMessage
if status == TeamsbotSessionStatus.ACTIVE.value:
from modules.shared.timeUtils import getUtcTimestamp
updates["startedAt"] = getUtcTimestamp()
elif status in [TeamsbotSessionStatus.ENDED.value, TeamsbotSessionStatus.ERROR.value]:
from modules.shared.timeUtils import getUtcTimestamp
updates["endedAt"] = getUtcTimestamp()
interface.updateSession(sessionId, updates)
# Emit SSE event
from .service import _emitSessionEvent
await _emitSessionEvent(sessionId, "statusChange", {"status": status, "errorMessage": errorMessage})
return {"received": True}
@router.websocket("/{instanceId}/bridge/audio/{sessionId}")
async def bridgeAudioWebsocket(
websocket: WebSocket,
instanceId: str,
sessionId: str,
):
"""
Bidirectional WebSocket for audio streaming with the .NET Media Bridge.
Bridge sends: PCM audio frames (LINEAR16, 16kHz, mono)
Gateway sends: TTS audio responses
"""
await websocket.accept()
logger.info(f"Bridge audio WebSocket connected: session={sessionId}")
# TODO: Validate bridge API key from headers/query params
config = _getInstanceConfig(instanceId)
from modules.datamodels.datamodelUam import User
systemUser = User(id="system", email="system@internal")
service = TeamsbotService(systemUser, None, instanceId, config)
try:
await service.handleAudioStream(websocket, sessionId)
except WebSocketDisconnect:
logger.info(f"Bridge audio WebSocket disconnected: session={sessionId}")
except Exception as e:
logger.error(f"Bridge audio WebSocket error: session={sessionId}, error={e}")
finally:
logger.info(f"Bridge audio WebSocket closed: session={sessionId}")

View file

@ -0,0 +1,549 @@
# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Teamsbot Service - Pipeline Orchestrator.
Manages the audio processing pipeline: STT -> Context Buffer -> SPEECH_TEAMS -> TTS -> Bridge.
"""
import logging
import json
import asyncio
import time
import base64
from typing import Optional, Dict, Any, List
from fastapi import WebSocket
from modules.datamodels.datamodelUam import User
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum
from modules.shared.timeUtils import getUtcTimestamp
from .datamodelTeamsbot import (
TeamsbotSessionStatus,
TeamsbotTranscript,
TeamsbotBotResponse,
TeamsbotResponseType,
TeamsbotConfig,
TeamsbotResponseMode,
SpeechTeamsResponse,
)
from .bridgeConnector import BridgeConnector
logger = logging.getLogger(__name__)
# =========================================================================
# Session Event Queues (for SSE streaming to frontend)
# =========================================================================
_sessionEvents: Dict[str, asyncio.Queue] = {}
async def _emitSessionEvent(sessionId: str, eventType: str, data: Any):
"""Emit an event to the session's SSE stream."""
eventQueue = _sessionEvents.get(sessionId)
if eventQueue:
await eventQueue.put({"type": eventType, "data": data, "timestamp": getUtcTimestamp()})
class TeamsbotService:
"""
Pipeline Orchestrator for Teams Bot sessions.
Coordinates VoiceObjects (STT/TTS), AiService (SPEECH_TEAMS), and Bridge communication.
"""
def __init__(self, currentUser: User, mandateId: str, instanceId: str, config: TeamsbotConfig):
self.currentUser = currentUser
self.mandateId = mandateId
self.instanceId = instanceId
self.config = config
self.bridgeConnector = BridgeConnector(config._getEffectiveBridgeUrl())
# State
self._lastAiCallTime: float = 0.0
self._contextBuffer: List[Dict[str, Any]] = []
# =========================================================================
# Session Lifecycle
# =========================================================================
async def joinMeeting(
self,
sessionId: str,
meetingLink: str,
connectionId: Optional[str] = None,
gatewayBaseUrl: str = "",
):
"""Send join command to the .NET Media Bridge.
Args:
gatewayBaseUrl: Base URL of this gateway instance (e.g. https://gateway-prod.poweron-center.net).
Passed to the bridge so it can build full callback/WS URLs per-session.
"""
from . import interfaceFeatureTeamsbot as interfaceDb
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
# Initialize SSE event queue
_sessionEvents[sessionId] = asyncio.Queue()
try:
# Update status to JOINING
interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.JOINING.value})
await _emitSessionEvent(sessionId, "statusChange", {"status": "joining"})
# Send join command to bridge
session = interface.getSession(sessionId)
if not session:
raise ValueError(f"Session {sessionId} not found")
result = await self.bridgeConnector.joinMeeting(
meetingLink=meetingLink,
botName=session.get("botName", self.config.botName),
backgroundImageUrl=session.get("backgroundImageUrl"),
sessionId=sessionId,
gatewayCallbackUrl=f"/api/teamsbot/{self.instanceId}/bridge/status",
gatewayWsUrl=f"/api/teamsbot/{self.instanceId}/bridge/audio/{sessionId}",
gatewayBaseUrl=gatewayBaseUrl,
)
if result.get("success"):
bridgeSessionId = result.get("bridgeSessionId")
interface.updateSession(sessionId, {
"bridgeSessionId": bridgeSessionId,
"status": TeamsbotSessionStatus.ACTIVE.value,
"startedAt": getUtcTimestamp(),
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "active"})
logger.info(f"Bot joined meeting for session {sessionId}")
else:
errorMsg = result.get("error", "Unknown error joining meeting")
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ERROR.value,
"errorMessage": errorMsg,
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": errorMsg})
logger.error(f"Failed to join meeting for session {sessionId}: {errorMsg}")
except Exception as e:
logger.error(f"Error joining meeting for session {sessionId}: {e}")
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ERROR.value,
"errorMessage": str(e),
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "error", "errorMessage": str(e)})
async def leaveMeeting(self, sessionId: str):
"""Send leave command to the .NET Media Bridge."""
from . import interfaceFeatureTeamsbot as interfaceDb
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
try:
interface.updateSession(sessionId, {"status": TeamsbotSessionStatus.LEAVING.value})
await _emitSessionEvent(sessionId, "statusChange", {"status": "leaving"})
await self.bridgeConnector.leaveMeeting(sessionId)
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ENDED.value,
"endedAt": getUtcTimestamp(),
})
await _emitSessionEvent(sessionId, "statusChange", {"status": "ended"})
# Generate meeting summary in background
asyncio.create_task(self._generateMeetingSummary(sessionId))
logger.info(f"Bot left meeting for session {sessionId}")
except Exception as e:
logger.error(f"Error leaving meeting for session {sessionId}: {e}")
interface.updateSession(sessionId, {
"status": TeamsbotSessionStatus.ERROR.value,
"errorMessage": str(e),
"endedAt": getUtcTimestamp(),
})
# Cleanup event queue
_sessionEvents.pop(sessionId, None)
# =========================================================================
# Audio Processing Pipeline
# =========================================================================
async def handleAudioStream(self, websocket: WebSocket, sessionId: str):
"""
Main audio processing loop.
Receives PCM audio from bridge via WebSocket, processes through STT -> AI -> TTS pipeline.
"""
from . import interfaceFeatureTeamsbot as interfaceDb
from modules.interfaces.interfaceVoiceObjects import getVoiceInterface
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
voiceInterface = getVoiceInterface(self.currentUser, self.mandateId)
audioBuffer = bytearray()
bufferDurationMs = 0
targetBufferMs = 1500 # Buffer 1.5 seconds of audio before STT
# PCM16 at 16kHz mono = 32000 bytes/second
bytesPerSecond = 32000
bytesPerMs = bytesPerSecond / 1000
logger.info(f"Audio processing started for session {sessionId}")
try:
while True:
# Receive audio data from bridge
data = await websocket.receive()
if "bytes" in data:
audioChunk = data["bytes"]
elif "text" in data:
# JSON message (control messages)
message = json.loads(data["text"])
msgType = message.get("type")
if msgType == "audio_chunk":
audioChunk = base64.b64decode(message.get("data", ""))
elif msgType == "session_ended":
logger.info(f"Bridge signaled session ended: {sessionId}")
break
elif msgType == "ping":
await websocket.send_text(json.dumps({"type": "pong"}))
continue
else:
continue
else:
continue
# Accumulate audio in buffer
audioBuffer.extend(audioChunk)
bufferDurationMs = len(audioBuffer) / bytesPerMs
# Process when buffer has enough audio
if bufferDurationMs >= targetBufferMs:
await self._processAudioBuffer(
bytes(audioBuffer),
sessionId,
interface,
voiceInterface,
websocket,
)
audioBuffer.clear()
bufferDurationMs = 0
except Exception as e:
if "disconnect" not in str(e).lower():
logger.error(f"Audio stream error for session {sessionId}: {e}")
# Process remaining buffer
if len(audioBuffer) > 0:
await self._processAudioBuffer(
bytes(audioBuffer),
sessionId,
interface,
voiceInterface,
websocket,
)
logger.info(f"Audio processing ended for session {sessionId}")
async def _processAudioBuffer(
self,
audioBytes: bytes,
sessionId: str,
interface,
voiceInterface,
websocket: WebSocket,
):
"""Process a buffered audio chunk through the STT -> AI -> TTS pipeline."""
# Step 1: STT -- convert audio to text
try:
sttResult = voiceInterface.speechToText(
audioContent=audioBytes,
language=self.config.language,
sampleRate=16000,
channels=1
)
except Exception as e:
logger.warning(f"STT failed for session {sessionId}: {e}")
return
transcriptText = sttResult.get("text", "").strip() if isinstance(sttResult, dict) else str(sttResult).strip()
if not transcriptText:
return
confidence = sttResult.get("confidence", 0.0) if isinstance(sttResult, dict) else 0.0
speaker = sttResult.get("speaker") if isinstance(sttResult, dict) else None
# Store transcript segment
transcriptData = TeamsbotTranscript(
sessionId=sessionId,
speaker=speaker,
text=transcriptText,
timestamp=getUtcTimestamp(),
confidence=confidence,
language=self.config.language,
isFinal=True,
).model_dump()
createdTranscript = interface.createTranscript(transcriptData)
# Update context buffer
self._contextBuffer.append({
"speaker": speaker or "Unknown",
"text": transcriptText,
"timestamp": getUtcTimestamp(),
})
# Keep only last N segments
maxSegments = self.config.contextWindowSegments
if len(self._contextBuffer) > maxSegments:
self._contextBuffer = self._contextBuffer[-maxSegments:]
# Emit SSE event for live transcript
await _emitSessionEvent(sessionId, "transcript", {
"id": createdTranscript.get("id"),
"speaker": speaker,
"text": transcriptText,
"confidence": confidence,
"timestamp": getUtcTimestamp(),
})
# Update session transcript count
session = interface.getSession(sessionId)
if session:
count = session.get("transcriptSegmentCount", 0) + 1
interface.updateSession(sessionId, {"transcriptSegmentCount": count})
# Step 2: Check if AI analysis should be triggered
if self.config.responseMode == TeamsbotResponseMode.TRANSCRIBE_ONLY:
return
if not self._shouldTriggerAnalysis(transcriptText):
return
# Step 3: SPEECH_TEAMS AI analysis
await self._analyzeAndRespond(sessionId, interface, voiceInterface, websocket, createdTranscript)
def _shouldTriggerAnalysis(self, transcriptText: str) -> bool:
"""
Decide whether to trigger AI analysis based on the latest transcript.
Triggers:
- Bot name mentioned (immediate)
- Periodic interval elapsed
- Cooldown respected
"""
now = time.time()
# Cooldown check
timeSinceLastCall = now - self._lastAiCallTime
if timeSinceLastCall < self.config.triggerCooldownSeconds:
return False
# Bot name mentioned -> immediate trigger
botNameLower = self.config.botName.lower()
if botNameLower in transcriptText.lower():
logger.debug(f"Trigger: Bot name '{self.config.botName}' detected in transcript")
return True
# Periodic trigger
if timeSinceLastCall >= self.config.triggerIntervalSeconds:
logger.debug(f"Trigger: Periodic interval ({self.config.triggerIntervalSeconds}s) elapsed")
return True
return False
async def _analyzeAndRespond(
self,
sessionId: str,
interface,
voiceInterface,
websocket: WebSocket,
triggerTranscript: Dict[str, Any],
):
"""Run SPEECH_TEAMS AI analysis and respond if needed."""
self._lastAiCallTime = time.time()
# Build transcript context from buffer
contextLines = []
for segment in self._contextBuffer:
speaker = segment.get("speaker", "Unknown")
text = segment.get("text", "")
contextLines.append(f"[{speaker}]: {text}")
transcriptContext = f"BOT_NAME:{self.config.botName}\n" + "\n".join(contextLines)
# Call SPEECH_TEAMS
try:
from modules.services.serviceAi.mainServiceAi import AiService
# Create AiService with service center context
# Note: In production, serviceCenter should be passed properly
aiService = AiService(serviceCenter=None)
await aiService.ensureAiObjectsInitialized()
request = AiCallRequest(
prompt=self.config.aiSystemPrompt,
context=transcriptContext,
options=AiCallOptions(
operationType=OperationTypeEnum.SPEECH_TEAMS,
priority=PriorityEnum.SPEED,
)
)
response = await aiService.callAi(request)
# Parse structured response
try:
speechResult = SpeechTeamsResponse.model_validate_json(response.content)
except Exception:
# Try to extract JSON from response content
try:
jsonStr = response.content
if "```json" in jsonStr:
jsonStr = jsonStr.split("```json")[1].split("```")[0]
elif "```" in jsonStr:
jsonStr = jsonStr.split("```")[1].split("```")[0]
speechResult = SpeechTeamsResponse.model_validate_json(jsonStr.strip())
except Exception as parseErr:
logger.warning(f"Failed to parse SPEECH_TEAMS response: {parseErr}")
speechResult = SpeechTeamsResponse(
shouldRespond=False,
reasoning=f"Parse error: {str(parseErr)[:100]}",
detectedIntent="none"
)
logger.info(
f"SPEECH_TEAMS result: shouldRespond={speechResult.shouldRespond}, "
f"intent={speechResult.detectedIntent}, "
f"reasoning={speechResult.reasoning[:80]}..."
)
# Emit analysis event (always, for debug/UI)
await _emitSessionEvent(sessionId, "analysis", {
"shouldRespond": speechResult.shouldRespond,
"detectedIntent": speechResult.detectedIntent,
"reasoning": speechResult.reasoning,
"modelName": response.modelName,
"processingTime": response.processingTime,
"priceCHF": response.priceCHF,
})
# Step 4: Respond if AI decided to
if speechResult.shouldRespond and speechResult.responseText:
if self.config.responseMode == TeamsbotResponseMode.MANUAL:
# In manual mode, suggest but don't send
await _emitSessionEvent(sessionId, "suggestedResponse", {
"responseText": speechResult.responseText,
"detectedIntent": speechResult.detectedIntent,
"reasoning": speechResult.reasoning,
})
return
# Auto mode: send voice + chat response
responseType = TeamsbotResponseType.BOTH
# 4a: TTS -> Audio to bridge
try:
ttsResult = voiceInterface.textToSpeech(
text=speechResult.responseText,
languageCode=self.config.language,
voiceName=self.config.voiceId
)
if ttsResult and isinstance(ttsResult, dict):
audioContent = ttsResult.get("audioContent")
if audioContent:
# Send TTS audio to bridge
await websocket.send_text(json.dumps({
"type": "tts_audio",
"data": base64.b64encode(audioContent if isinstance(audioContent, bytes) else audioContent.encode()).decode(),
"format": "mp3",
}))
except Exception as ttsErr:
logger.warning(f"TTS failed for session {sessionId}: {ttsErr}")
responseType = TeamsbotResponseType.CHAT # Fallback to chat only
# 4b: Store bot response
botResponseData = TeamsbotBotResponse(
sessionId=sessionId,
responseText=speechResult.responseText,
responseType=responseType,
detectedIntent=speechResult.detectedIntent,
reasoning=speechResult.reasoning,
triggeredByTranscriptId=triggerTranscript.get("id"),
modelName=response.modelName,
processingTime=response.processingTime,
priceCHF=response.priceCHF,
timestamp=getUtcTimestamp(),
).model_dump()
createdResponse = interface.createBotResponse(botResponseData)
# 4c: Emit SSE event
await _emitSessionEvent(sessionId, "botResponse", {
"id": createdResponse.get("id"),
"responseText": speechResult.responseText,
"responseType": responseType.value,
"detectedIntent": speechResult.detectedIntent,
"reasoning": speechResult.reasoning,
"modelName": response.modelName,
"processingTime": response.processingTime,
"priceCHF": response.priceCHF,
})
# Update session response count
session = interface.getSession(sessionId)
if session:
count = session.get("botResponseCount", 0) + 1
interface.updateSession(sessionId, {"botResponseCount": count})
logger.info(f"Bot responded in session {sessionId}: intent={speechResult.detectedIntent}")
except Exception as e:
logger.error(f"SPEECH_TEAMS analysis failed for session {sessionId}: {e}")
await _emitSessionEvent(sessionId, "error", {"message": f"AI analysis failed: {str(e)}"})
# =========================================================================
# Meeting Summary
# =========================================================================
async def _generateMeetingSummary(self, sessionId: str):
"""Generate an AI summary of the meeting after it ends."""
try:
from . import interfaceFeatureTeamsbot as interfaceDb
interface = interfaceDb.getInterface(self.currentUser, self.mandateId, self.instanceId)
transcripts = interface.getTranscripts(sessionId)
if not transcripts or len(transcripts) < 5:
return # Not enough content for a summary
# Build full transcript
fullTranscript = "\n".join(
f"[{t.get('speaker', 'Unknown')}]: {t.get('text', '')}"
for t in transcripts
)
from modules.services.serviceAi.mainServiceAi import AiService
aiService = AiService(serviceCenter=None)
await aiService.ensureAiObjectsInitialized()
request = AiCallRequest(
prompt="Erstelle eine kurze Zusammenfassung dieses Meeting-Transkripts. Nenne die wichtigsten Punkte, Entscheidungen und offene Aktionspunkte.",
context=fullTranscript,
options=AiCallOptions(
operationType=OperationTypeEnum.DATA_ANALYSE,
priority=PriorityEnum.BALANCED,
)
)
response = await aiService.callAi(request)
if response and response.errorCount == 0:
interface.updateSession(sessionId, {"summary": response.content})
logger.info(f"Meeting summary generated for session {sessionId}")
except Exception as e:
logger.error(f"Failed to generate meeting summary for session {sessionId}: {e}")

View file

@ -58,9 +58,13 @@ SCOPES = [
"Mail.ReadWrite", # Read and write mail
"Mail.Send", # Send mail
"Files.ReadWrite.All", # Read and write files (SharePoint/OneDrive)
"Sites.ReadWrite.All" # Read and write SharePoint sites
"Sites.ReadWrite.All", # Read and write SharePoint sites
# Teams Bot: Meeting and chat access (requires admin consent)
"OnlineMeetings.Read.All", # Read Teams meeting details
"Chat.ReadWrite", # Read and write Teams chat messages
"ChatMessage.Send", # Send messages to Teams meeting chats
]
# NOTE: Sites.ReadWrite.All and Files.ReadWrite.All require admin consent.
# NOTE: Sites.ReadWrite.All, Files.ReadWrite.All, and Teams scopes require admin consent.
# An admin must grant consent ONCE at: /api/msft/adminconsent
# After that, all users can connect without individual admin approval.

View file

@ -108,6 +108,9 @@ def _getFeatureUiObjects(featureCode: str) -> List[Dict[str, Any]]:
elif featureCode == "automation":
from modules.features.automation.mainAutomation import UI_OBJECTS
return UI_OBJECTS
elif featureCode == "teamsbot":
from modules.features.teamsbot.mainTeamsbot import UI_OBJECTS
return UI_OBJECTS
else:
logger.warning(f"Unknown feature code: {featureCode}")
return []

View file

@ -95,6 +95,10 @@ class AiService:
3. billingCallback on aiObjects: records one billing transaction per model call
with exact provider + model name (set before AI call, invoked by _callWithModel)
"""
# SPEECH_TEAMS: Dedicated pipeline, bypasses standard model selection
if request.options and request.options.operationType == OperationTypeEnum.SPEECH_TEAMS:
return await self._handleSpeechTeams(request)
# FAIL-SAFE: Pre-flight billing validation (like 0 CHF credit card check)
self._preflightBillingCheck()
@ -127,6 +131,243 @@ class AiService:
self._storeAiCallStats(response, request)
return response
# =========================================================================
# SPEECH_TEAMS: Dedicated handler for Teams Meeting AI analysis
# Bypasses standard model selection. Uses a fixed fast model.
# =========================================================================
async def _handleSpeechTeams(self, request: AiCallRequest):
"""
Dedicated handler for SPEECH_TEAMS operation type.
Bypasses standard model selection and uses a fixed fast model optimized
for low-latency meeting transcript analysis.
The handler:
1. Selects a fixed fast model (no model selector)
2. Builds a specialized system prompt for meeting analysis
3. Calls the model with structured JSON output
4. Returns a SpeechTeamsResponse wrapped in AiCallResponse
Args:
request: AiCallRequest with:
- prompt: User-configured system prompt (from FeatureInstance.config.aiSystemPrompt)
- context: Accumulated transcript segments to analyze
- options.metadata: Optional dict with "botName" key
Returns:
AiCallResponse with content as JSON string (SpeechTeamsResponse format)
"""
from modules.datamodels.datamodelAi import AiCallResponse, AiModelCall, AiCallOptions, PriorityEnum
startTime = time.time()
# Billing pre-flight (SPEECH_TEAMS also needs billing)
self._preflightBillingCheck()
await self._checkBillingBeforeAiCall()
# 1. Select a fixed fast model (bypass model selector)
model = self._getSpeechTeamsModel()
if not model:
return AiCallResponse(
content=json.dumps({"shouldRespond": False, "responseText": None, "reasoning": "No suitable model available for SPEECH_TEAMS", "detectedIntent": "none"}),
modelName="error",
provider="unknown",
priceCHF=0.0,
processingTime=0.0,
bytesSent=0,
bytesReceived=0,
errorCount=1
)
# 2. Build specialized system prompt
metadata = {}
if hasattr(request.options, 'allowedProviders') and request.options.allowedProviders:
# Reuse allowedProviders field as metadata carrier if set (backward compat)
pass
botName = metadata.get("botName", "AI Assistant")
# Extract botName from context if embedded as header
contextText = request.context or ""
if contextText.startswith("BOT_NAME:"):
lines = contextText.split("\n", 1)
botName = lines[0].replace("BOT_NAME:", "").strip()
contextText = lines[1] if len(lines) > 1 else ""
userSystemPrompt = request.prompt or ""
systemPrompt = self._buildSpeechTeamsSystemPrompt(userSystemPrompt, botName)
# 3. Build messages
messages = [
{"role": "system", "content": systemPrompt},
{"role": "user", "content": contextText}
]
# 4. Call model directly (no failover loop -- single fast model)
modelCall = AiModelCall(
messages=messages,
model=model,
options=AiCallOptions(
operationType=OperationTypeEnum.SPEECH_TEAMS,
priority=PriorityEnum.SPEED,
temperature=0.3,
resultFormat="json"
)
)
# Set billing callback
self.aiObjects.billingCallback = self._createBillingCallback()
try:
inputBytes = len((systemPrompt + contextText).encode('utf-8'))
modelResponse = await model.functionCall(modelCall)
if not modelResponse.success:
raise ValueError(f"SPEECH_TEAMS model call failed: {modelResponse.error}")
content = modelResponse.content
outputBytes = len(content.encode('utf-8'))
processingTime = time.time() - startTime
priceCHF = model.calculatepriceCHF(processingTime, inputBytes, outputBytes)
response = AiCallResponse(
content=content,
modelName=model.name,
provider=model.connectorType,
priceCHF=priceCHF,
processingTime=processingTime,
bytesSent=inputBytes,
bytesReceived=outputBytes,
errorCount=0
)
# Record billing
if self.aiObjects.billingCallback:
try:
self.aiObjects.billingCallback(response)
except Exception as e:
logger.error(f"BILLING: Failed to record billing for SPEECH_TEAMS: {e}")
# Store stats
self._storeAiCallStats(response, request)
logger.info(f"SPEECH_TEAMS call completed: model={model.name}, time={processingTime:.2f}s, cost={priceCHF:.4f} CHF")
return response
except Exception as e:
processingTime = time.time() - startTime
logger.error(f"SPEECH_TEAMS call failed: {e}")
return AiCallResponse(
content=json.dumps({"shouldRespond": False, "responseText": None, "reasoning": f"Error: {str(e)}", "detectedIntent": "none"}),
modelName=model.name if model else "error",
provider=model.connectorType if model else "unknown",
priceCHF=0.0,
processingTime=processingTime,
bytesSent=0,
bytesReceived=0,
errorCount=1
)
finally:
self.aiObjects.billingCallback = None
def _getSpeechTeamsModel(self):
"""
Get the fixed fast model for SPEECH_TEAMS.
Prioritizes: GPT-4o-mini > Claude Haiku > any fast model with DATA_ANALYSE capability.
Returns the AiModel instance or None.
"""
from modules.aicore.aicoreModelRegistry import modelRegistry
availableModels = modelRegistry.getAvailableModels()
if not availableModels:
logger.error("SPEECH_TEAMS: No models available in registry")
return None
# Priority list of preferred models for SPEECH_TEAMS (fast + cheap)
_preferredModelNames = [
"gpt-4o-mini", # OpenAI: fast, cheap, good at JSON
"claude-3-5-haiku", # Anthropic: fast, cheap
"gpt-4o", # OpenAI: fallback to quality model
"claude-sonnet-4-5", # Anthropic: fallback
]
# Try preferred models in order
for preferredName in _preferredModelNames:
for model in availableModels:
if preferredName in model.name.lower() and model.functionCall and model.isAvailable:
logger.info(f"SPEECH_TEAMS: Selected preferred model '{model.name}' ({model.displayName})")
return model
# Fallback: pick fastest available model with DATA_ANALYSE capability
_dataAnalyseModels = []
for model in availableModels:
if not model.functionCall or not model.isAvailable:
continue
for opRating in model.operationTypes:
if opRating.operationType == OperationTypeEnum.DATA_ANALYSE:
_dataAnalyseModels.append((model, opRating.rating))
break
if _dataAnalyseModels:
# Sort by speed rating (descending) then cost (ascending)
_dataAnalyseModels.sort(key=lambda x: (-x[0].speedRating, x[0].costPer1kTokensInput))
bestModel = _dataAnalyseModels[0][0]
logger.info(f"SPEECH_TEAMS: Selected fallback model '{bestModel.name}' (speed={bestModel.speedRating})")
return bestModel
# Last resort: first available model
for model in availableModels:
if model.functionCall and model.isAvailable:
logger.warning(f"SPEECH_TEAMS: Using last-resort model '{model.name}'")
return model
return None
def _buildSpeechTeamsSystemPrompt(self, userSystemPrompt: str, botName: str) -> str:
"""
Build the specialized system prompt for SPEECH_TEAMS meeting analysis.
Combines a fixed base prompt with user-configurable instructions.
"""
basePrompt = f"""Du bist "{botName}", ein AI-Teilnehmer in einem Microsoft Teams Meeting.
Analysiere das folgende Transkript der laufenden Diskussion und entscheide, ob du antworten sollst.
ANTWORTE NUR wenn:
1. Du direkt angesprochen wirst (z.B. "{botName}, was denkst du?" oder "Hey {botName}")
2. Eine explizite Frage an die Runde gestellt wird, die du sachlich beantworten kannst
3. Du einen wertvollen, nicht-offensichtlichen Beitrag zur Diskussion hast
ANTWORTE NICHT wenn:
- Die Teilnehmer normal miteinander sprechen ohne dich einzubeziehen
- Die Diskussion keinen Input von dir benoetigt
- Du nur wiederholen wuerdest, was bereits gesagt wurde
- Es sich um Smalltalk oder Begruessung handelt
ANTWORT-STIL:
- Halte dich kurz und praezise (max 2-3 Saetze fuer Sprache)
- Sei professionell aber natuerlich
- Beziehe dich konkret auf das Gesagte"""
# Append user-configured instructions if provided
if userSystemPrompt and userSystemPrompt.strip():
basePrompt += f"\n\nZUSAETZLICHE ANWEISUNGEN:\n{userSystemPrompt.strip()}"
basePrompt += f"""
WICHTIG: Antworte IMMER als valides JSON in exakt diesem Format:
{{
"shouldRespond": true/false,
"responseText": "Deine Antwort hier" oder null,
"reasoning": "Kurze Begruendung deiner Entscheidung",
"detectedIntent": "addressed" | "question" | "proactive" | "none"
}}
detectedIntent-Werte:
- "addressed": {botName} wurde direkt angesprochen
- "question": Eine allgemeine Frage wurde gestellt
- "proactive": Du hast einen wertvollen proaktiven Beitrag
- "none": Kein Handlungsbedarf"""
return basePrompt
def _preflightBillingCheck(self) -> None:
"""