From 3fa5b98f472e5dbf2021eb19870517fd45eebac4 Mon Sep 17 00:00:00 2001
From: patrick-motsch
Date: Wed, 18 Feb 2026 21:03:08 +0100
Subject: [PATCH] security: restrict system bot access to SysAdmin only
Co-authored-by: Cursor
---
.../features/teamsbot/routeFeatureTeamsbot.py | 87 +++++++++++--------
1 file changed, 52 insertions(+), 35 deletions(-)
diff --git a/modules/features/teamsbot/routeFeatureTeamsbot.py b/modules/features/teamsbot/routeFeatureTeamsbot.py
index abac3ef0..52a3ba72 100644
--- a/modules/features/teamsbot/routeFeatureTeamsbot.py
+++ b/modules/features/teamsbot/routeFeatureTeamsbot.py
@@ -201,44 +201,49 @@ async def startSession(
botAccountEmail = None
botAccountPassword = None
- # Load system bot from DB (try mandate-specific first, then any active bot)
- systemBot = interface.getActiveSystemBot(mandateId)
- if not systemBot:
- from .datamodelTeamsbot import TeamsbotSystemBot
- allBots = interface.db.getRecordset(TeamsbotSystemBot, recordFilter={"isActive": True})
- if allBots:
- systemBot = allBots[0]
- logger.info(f"No mandate-specific system bot, using fallback: {systemBot.get('name')} ({systemBot.get('email')})")
+ # System bot access: only SysAdmin can use the system bot account.
+ # Non-SysAdmin users are forced to anonymous join to prevent concurrent
+ # access conflicts (a Microsoft account can only be in one meeting at a time).
+ if context.isSysAdmin:
+ systemBot = interface.getActiveSystemBot(mandateId)
+ if not systemBot:
+ from .datamodelTeamsbot import TeamsbotSystemBot
+ allBots = interface.db.getRecordset(TeamsbotSystemBot, recordFilter={"isActive": True})
+ if allBots:
+ systemBot = allBots[0]
+ logger.info(f"No mandate-specific system bot, using fallback: {systemBot.get('name')} ({systemBot.get('email')})")
- if systemBot:
- if not effectiveBotName:
- effectiveBotName = systemBot.get("name")
- # Derive display name from email if system bot has no explicit name
- # e.g. "nyla.larsson@poweron.swiss" → "Nyla Larsson"
- if not effectiveBotName:
- sbEmail = systemBot.get("email", "")
- if sbEmail and "@" in sbEmail:
- emailPrefix = sbEmail.split("@")[0]
- effectiveBotName = " ".join(part.capitalize() for part in emailPrefix.split("."))
- logger.info(f"Bot name derived from email: {effectiveBotName}")
- if not effectiveBotName:
- effectiveBotName = effectiveConfig.botName
- logger.info(f"System bot found: {systemBot.get('name')} ({systemBot.get('email')}), effectiveBotName={effectiveBotName}")
+ if systemBot:
+ if not effectiveBotName:
+ effectiveBotName = systemBot.get("name")
+ if not effectiveBotName:
+ sbEmail = systemBot.get("email", "")
+ if sbEmail and "@" in sbEmail:
+ emailPrefix = sbEmail.split("@")[0]
+ effectiveBotName = " ".join(part.capitalize() for part in emailPrefix.split("."))
+ logger.info(f"Bot name derived from email: {effectiveBotName}")
+ if not effectiveBotName:
+ effectiveBotName = effectiveConfig.botName
+ logger.info(f"System bot found: {systemBot.get('name')} ({systemBot.get('email')}), effectiveBotName={effectiveBotName}")
- # Load and decrypt credentials for authenticated join
- botAccountEmail = systemBot.get("email")
- encryptedPwd = systemBot.get("encryptedPassword")
- if botAccountEmail and encryptedPwd:
- try:
- from modules.shared.configuration import decryptValue
- botAccountPassword = decryptValue(encryptedPwd, userId=str(context.user.id), keyName="systemBotPassword")
- logger.info(f"System bot credentials loaded and decrypted for: {botAccountEmail}")
- except Exception as e:
- logger.warning(f"Could not decrypt system bot password: {e} — falling back to anonymous join")
- botAccountEmail = None
- botAccountPassword = None
+ botAccountEmail = systemBot.get("email")
+ encryptedPwd = systemBot.get("encryptedPassword")
+ if botAccountEmail and encryptedPwd:
+ try:
+ from modules.shared.configuration import decryptValue
+ botAccountPassword = decryptValue(encryptedPwd, userId=str(context.user.id), keyName="systemBotPassword")
+ logger.info(f"System bot credentials loaded and decrypted for: {botAccountEmail}")
+ except Exception as e:
+ logger.warning(f"Could not decrypt system bot password: {e} — falling back to anonymous join")
+ botAccountEmail = None
+ botAccountPassword = None
+ else:
+ logger.info("No system bot found in DB — using anonymous join")
else:
- logger.info("No system bot found in DB — using anonymous join")
+ if body.joinMode == TeamsbotJoinMode.SYSTEM_BOT:
+ logger.warning(f"Non-SysAdmin user {context.user.id} attempted to use system bot — forced to anonymous join")
+ joinMode = TeamsbotJoinMode.ANONYMOUS
+ logger.info(f"Non-SysAdmin user {context.user.id}: using anonymous join (system bot restricted to SysAdmin)")
if not effectiveBotName:
effectiveBotName = effectiveConfig.botName
@@ -588,6 +593,8 @@ async def listSystemBots(
context: RequestContext = Depends(getRequestContext),
):
"""List all system bot accounts for this mandate. Passwords are never returned."""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail="SysAdmin privileges required to manage system bots")
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
bots = interface.getSystemBots(mandateId)
@@ -602,6 +609,8 @@ async def createSystemBot(
context: RequestContext = Depends(getRequestContext),
):
"""Create a new system bot account. Password is encrypted before storage."""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail="SysAdmin privileges required to manage system bots")
mandateId = _validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
@@ -643,6 +652,8 @@ async def deleteSystemBot(
context: RequestContext = Depends(getRequestContext),
):
"""Delete a system bot account."""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail="SysAdmin privileges required to manage system bots")
_validateInstanceAccess(instanceId, context)
interface = _getInterface(context, instanceId)
@@ -745,6 +756,8 @@ async def testAuth(
receive the /v2/ (authenticated) vs light-meetings (anonymous) page.
Does NOT join the meeting — only checks which page Teams serves.
"""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail="SysAdmin privileges required for auth testing (uses system bot credentials)")
import aiohttp
mandateId = _validateInstanceAccess(instanceId, context)
@@ -855,6 +868,8 @@ async def getTestAuthVariants(
Get list of available test variant IDs from the Browser Bot.
Frontend calls this once, then runs each variant individually.
"""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail="SysAdmin privileges required for auth testing")
import aiohttp
_validateInstanceAccess(instanceId, context)
@@ -889,6 +904,8 @@ async def testAuthSingleVariant(
Run a single test variant. Frontend calls this once per variant (sequentially).
Each call stays within Azure's 240s timeout.
"""
+ if not context.isSysAdmin:
+ raise HTTPException(status_code=403, detail="SysAdmin privileges required for auth testing (uses system bot credentials)")
import aiohttp
mandateId = _validateInstanceAccess(instanceId, context)