security: restrict system bot access to SysAdmin only

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
patrick-motsch 2026-02-18 21:03:08 +01:00
parent 1227324703
commit 3fa5b98f47

View file

@ -201,44 +201,49 @@ async def startSession(
botAccountEmail = None
botAccountPassword = None
# Load system bot from DB (try mandate-specific first, then any active bot)
systemBot = interface.getActiveSystemBot(mandateId)
if not systemBot:
from .datamodelTeamsbot import TeamsbotSystemBot
allBots = interface.db.getRecordset(TeamsbotSystemBot, recordFilter={"isActive": True})
if allBots:
systemBot = allBots[0]
logger.info(f"No mandate-specific system bot, using fallback: {systemBot.get('name')} ({systemBot.get('email')})")
# System bot access: only SysAdmin can use the system bot account.
# Non-SysAdmin users are forced to anonymous join to prevent concurrent
# access conflicts (a Microsoft account can only be in one meeting at a time).
if context.isSysAdmin:
systemBot = interface.getActiveSystemBot(mandateId)
if not systemBot:
from .datamodelTeamsbot import TeamsbotSystemBot
allBots = interface.db.getRecordset(TeamsbotSystemBot, recordFilter={"isActive": True})
if allBots:
systemBot = allBots[0]
logger.info(f"No mandate-specific system bot, using fallback: {systemBot.get('name')} ({systemBot.get('email')})")
if systemBot:
if not effectiveBotName:
effectiveBotName = systemBot.get("name")
# Derive display name from email if system bot has no explicit name
# e.g. "nyla.larsson@poweron.swiss" → "Nyla Larsson"
if not effectiveBotName:
sbEmail = systemBot.get("email", "")
if sbEmail and "@" in sbEmail:
emailPrefix = sbEmail.split("@")[0]
effectiveBotName = " ".join(part.capitalize() for part in emailPrefix.split("."))
logger.info(f"Bot name derived from email: {effectiveBotName}")
if not effectiveBotName:
effectiveBotName = effectiveConfig.botName
logger.info(f"System bot found: {systemBot.get('name')} ({systemBot.get('email')}), effectiveBotName={effectiveBotName}")
if systemBot:
if not effectiveBotName:
effectiveBotName = systemBot.get("name")
if not effectiveBotName:
sbEmail = systemBot.get("email", "")
if sbEmail and "@" in sbEmail:
emailPrefix = sbEmail.split("@")[0]
effectiveBotName = " ".join(part.capitalize() for part in emailPrefix.split("."))
logger.info(f"Bot name derived from email: {effectiveBotName}")
if not effectiveBotName:
effectiveBotName = effectiveConfig.botName
logger.info(f"System bot found: {systemBot.get('name')} ({systemBot.get('email')}), effectiveBotName={effectiveBotName}")
# Load and decrypt credentials for authenticated join
botAccountEmail = systemBot.get("email")
encryptedPwd = systemBot.get("encryptedPassword")
if botAccountEmail and encryptedPwd:
try:
from modules.shared.configuration import decryptValue
botAccountPassword = decryptValue(encryptedPwd, userId=str(context.user.id), keyName="systemBotPassword")
logger.info(f"System bot credentials loaded and decrypted for: {botAccountEmail}")
except Exception as e:
logger.warning(f"Could not decrypt system bot password: {e} — falling back to anonymous join")
botAccountEmail = None
botAccountPassword = None
botAccountEmail = systemBot.get("email")
encryptedPwd = systemBot.get("encryptedPassword")
if botAccountEmail and encryptedPwd:
try:
from modules.shared.configuration import decryptValue
botAccountPassword = decryptValue(encryptedPwd, userId=str(context.user.id), keyName="systemBotPassword")
logger.info(f"System bot credentials loaded and decrypted for: {botAccountEmail}")
except Exception as e:
logger.warning(f"Could not decrypt system bot password: {e} — falling back to anonymous join")
botAccountEmail = None
botAccountPassword = None
else:
logger.info("No system bot found in DB — using anonymous join")
else:
logger.info("No system bot found in DB — using anonymous join")
if body.joinMode == TeamsbotJoinMode.SYSTEM_BOT:
logger.warning(f"Non-SysAdmin user {context.user.id} attempted to use system bot — forced to anonymous join")
joinMode = TeamsbotJoinMode.ANONYMOUS
logger.info(f"Non-SysAdmin user {context.user.id}: using anonymous join (system bot restricted to SysAdmin)")
if not effectiveBotName:
effectiveBotName = effectiveConfig.botName
@ -588,6 +593,8 @@ async def listSystemBots(
context: RequestContext = Depends(getRequestContext),
):
"""List all system bot accounts for this mandate. Passwords are never returned."""
if not context.isSysAdmin:
raise HTTPException(status_code=403, detail="SysAdmin privileges required to manage system bots")
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
bots = interface.getSystemBots(mandateId)
@ -602,6 +609,8 @@ async def createSystemBot(
context: RequestContext = Depends(getRequestContext),
):
"""Create a new system bot account. Password is encrypted before storage."""
if not context.isSysAdmin:
raise HTTPException(status_code=403, detail="SysAdmin privileges required to manage system bots")
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
@ -643,6 +652,8 @@ async def deleteSystemBot(
context: RequestContext = Depends(getRequestContext),
):
"""Delete a system bot account."""
if not context.isSysAdmin:
raise HTTPException(status_code=403, detail="SysAdmin privileges required to manage system bots")
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
@ -745,6 +756,8 @@ async def testAuth(
receive the /v2/ (authenticated) vs light-meetings (anonymous) page.
Does NOT join the meeting only checks which page Teams serves.
"""
if not context.isSysAdmin:
raise HTTPException(status_code=403, detail="SysAdmin privileges required for auth testing (uses system bot credentials)")
import aiohttp
mandateId = _validateInstanceAccess(instanceId, context)
@ -855,6 +868,8 @@ async def getTestAuthVariants(
Get list of available test variant IDs from the Browser Bot.
Frontend calls this once, then runs each variant individually.
"""
if not context.isSysAdmin:
raise HTTPException(status_code=403, detail="SysAdmin privileges required for auth testing")
import aiohttp
_validateInstanceAccess(instanceId, context)
@ -889,6 +904,8 @@ async def testAuthSingleVariant(
Run a single test variant. Frontend calls this once per variant (sequentially).
Each call stays within Azure's 240s timeout.
"""
if not context.isSysAdmin:
raise HTTPException(status_code=403, detail="SysAdmin privileges required for auth testing (uses system bot credentials)")
import aiohttp
mandateId = _validateInstanceAccess(instanceId, context)