gateway/modules/routes/routeSecurityMsft.py
ValueOn AG 0a0973d41b hotfix msft/google login tokens end to end separated from connection
feat(billing): Nutzerhinweise bei leerem Budget + Mandats-Mail (402/SSE)
Gateway
- InsufficientBalanceException: billingModel, userAction (TOP_UP_SELF /
  CONTACT_MANDATE_ADMIN), DE/EN-Texte, toClientDict(), fromBalanceCheck()
- HTTP 402 + JSON detail für globale API-Fehlerbehandlung
- AI/Chatbot: vor Raise ggf. E-Mail an BillingSettings.notifyEmails
  (PREPAY_MANDATE, Throttle 1h/Mandat) via billingExhaustedNotify
- Agent-Loop & Workspace-Route: SSE-ERROR mit strukturiertem Billing-Payload
- datamodelBilling: notifyEmails-Doku für Pool-Alerts
frontend_nyla
- useWorkspace: SSE onError für INSUFFICIENT_BALANCE mit messageDe/En
  und Hinweis auf Billing-Pfad bei TOP_UP_SELF
2026-03-21 01:34:40 +01:00

704 lines
26 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Routes for Microsoft authentication — split Auth app vs Data app.
See wiki: concepts/OAuth-Auth-vs-Data-Connection-Konzept.md
"""
from fastapi import APIRouter, HTTPException, Request, Response, status, Depends, Query
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import json
import time
from typing import Dict, Any, Optional
from urllib.parse import quote
import msal
import httpx
from jose import jwt as jose_jwt
from jose import JWTError
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.interfaceDbApp import getInterface, getRootInterface
from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection
from modules.datamodels.datamodelSecurity import Token, TokenPurpose
from modules.auth import getCurrentUser, limiter, SECRET_KEY, ALGORITHM
from modules.auth import (
createAccessToken,
setAccessTokenCookie,
createRefreshToken,
setRefreshTokenCookie,
clearAccessTokenCookie,
clearRefreshTokenCookie,
)
from modules.auth.tokenManager import TokenManager
from modules.auth.oauthProviderConfig import msftAuthScopes, msftDataScopes, msftDataScopesForRefresh
from modules.shared.timeUtils import createExpirationTimestamp, getUtcTimestamp, parseTimestamp
logger = logging.getLogger(__name__)
_FLOW_LOGIN = "msft_login"
_FLOW_CONNECT = "msft_connect"
router = APIRouter(
prefix="/api/msft",
tags=["Security Microsoft"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"},
},
)
AUTH_CLIENT_ID = APP_CONFIG.get("Service_MSFT_AUTH_CLIENT_ID")
AUTH_CLIENT_SECRET = APP_CONFIG.get("Service_MSFT_AUTH_CLIENT_SECRET")
AUTH_REDIRECT_URI = APP_CONFIG.get("Service_MSFT_AUTH_REDIRECT_URI")
DATA_CLIENT_ID = APP_CONFIG.get("Service_MSFT_DATA_CLIENT_ID")
DATA_CLIENT_SECRET = APP_CONFIG.get("Service_MSFT_DATA_CLIENT_SECRET")
DATA_REDIRECT_URI = APP_CONFIG.get("Service_MSFT_DATA_REDIRECT_URI")
TENANT_ID = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common")
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
def _issue_oauth_state(claims: Dict[str, Any]) -> str:
body = {**claims, "exp": int(time.time()) + 600}
return jose_jwt.encode(body, SECRET_KEY, algorithm=ALGORITHM)
def _parse_oauth_state(state: str) -> Dict[str, Any]:
try:
return jose_jwt.decode(state, SECRET_KEY, algorithms=[ALGORITHM])
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid OAuth state: {e}"
) from e
def _require_msft_auth_config():
if not AUTH_CLIENT_ID or not AUTH_CLIENT_SECRET or not AUTH_REDIRECT_URI:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Microsoft Auth OAuth is not configured (Service_MSFT_AUTH_*)",
)
def _require_msft_data_config():
if not DATA_CLIENT_ID or not DATA_CLIENT_SECRET or not DATA_REDIRECT_URI:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Microsoft Data OAuth is not configured (Service_MSFT_DATA_*)",
)
def _admin_consent_redirect_uri() -> str:
if "/auth/connect/callback" in (DATA_REDIRECT_URI or ""):
return DATA_REDIRECT_URI.replace("/auth/connect/callback", "/adminconsent/callback")
if DATA_REDIRECT_URI:
return DATA_REDIRECT_URI.rstrip("/").rsplit("/", 1)[0] + "/adminconsent/callback"
return ""
@router.get("/auth/login")
@limiter.limit("5/minute")
def auth_login(request: Request) -> RedirectResponse:
"""Start Microsoft login (Auth app — User.Read only)."""
try:
_require_msft_auth_config()
msal_app = msal.ConfidentialClientApplication(
AUTH_CLIENT_ID,
authority=AUTHORITY,
client_credential=AUTH_CLIENT_SECRET,
)
state_jwt = _issue_oauth_state({"flow": _FLOW_LOGIN})
auth_url = msal_app.get_authorization_request_url(
scopes=msftAuthScopes,
redirect_uri=AUTH_REDIRECT_URI,
state=state_jwt,
prompt="select_account",
)
return RedirectResponse(auth_url)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error initiating Microsoft auth login: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to initiate Microsoft login: {str(e)}",
)
@router.get("/auth/login/callback")
async def auth_login_callback(
code: str, state: str, request: Request, response: Response
) -> HTMLResponse:
state_data = _parse_oauth_state(state)
if state_data.get("flow") != _FLOW_LOGIN:
raise HTTPException(status_code=400, detail="Invalid OAuth flow for this callback")
_require_msft_auth_config()
msal_app = msal.ConfidentialClientApplication(
AUTH_CLIENT_ID,
authority=AUTHORITY,
client_credential=AUTH_CLIENT_SECRET,
)
token_response = msal_app.acquire_token_by_authorization_code(
code,
scopes=msftAuthScopes,
redirect_uri=AUTH_REDIRECT_URI,
)
if "error" in token_response:
err = token_response.get("error_description", token_response.get("error"))
return HTMLResponse(
content=f"<html><body><h1>Authentication Failed</h1><p>{err}</p></body></html>",
status_code=400,
)
headers = {
"Authorization": f"Bearer {token_response['access_token']}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient() as client:
user_info_response = await client.get(
"https://graph.microsoft.com/v1.0/me", headers=headers
)
if user_info_response.status_code != 200:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get user info from Microsoft",
)
user_info = user_info_response.json()
rootInterface = getRootInterface()
user = rootInterface.getUserByUsername(user_info.get("userPrincipalName"))
if not user:
user = rootInterface.createUser(
username=user_info.get("userPrincipalName"),
email=user_info.get("mail"),
fullName=user_info.get("displayName"),
authenticationAuthority=AuthAuthority.MSFT,
externalId=user_info.get("id"),
externalUsername=user_info.get("userPrincipalName"),
externalEmail=user_info.get("mail"),
addExternalIdentityConnection=False,
)
jwt_token_data = {
"sub": user.username,
"userId": str(user.id),
"authenticationAuthority": AuthAuthority.MSFT.value,
}
jwt_token, jwt_expires_at = createAccessToken(jwt_token_data)
refresh_token_cookie, _refresh_expires = createRefreshToken(jwt_token_data)
payload = jose_jwt.decode(jwt_token, SECRET_KEY, algorithms=[ALGORITHM])
jti = payload.get("jti")
jwt_token_obj = Token(
id=jti,
userId=user.id,
authority=AuthAuthority.MSFT,
tokenPurpose=TokenPurpose.AUTH_SESSION,
tokenAccess=jwt_token,
tokenType="bearer",
expiresAt=jwt_expires_at.timestamp(),
createdAt=getUtcTimestamp(),
tokenRefresh="",
)
appInterface = getInterface(user)
appInterface.saveAccessToken(jwt_token_obj)
token_dict = jwt_token_obj.model_dump()
html_response = HTMLResponse(
content=f"""
<html>
<head><title>Authentication Successful</title></head>
<body>
<script>
if (window.opener) {{
window.opener.postMessage({{
type: 'msft_auth_success',
token_data: {json.dumps(token_dict)}
}}, '*');
}}
setTimeout(() => window.close(), 1000);
</script>
</body>
</html>
"""
)
setAccessTokenCookie(html_response, jwt_token, expiresDelta=None)
setRefreshTokenCookie(html_response, refresh_token_cookie)
return html_response
@router.get("/auth/connect")
@limiter.limit("5/minute")
def auth_connect(
request: Request,
connectionId: str = Query(..., description="UserConnection id"),
currentUser: User = Depends(getCurrentUser),
) -> RedirectResponse:
"""Start Microsoft Data OAuth for an existing connection."""
try:
_require_msft_data_config()
interface = getInterface(currentUser)
connections = interface.getUserConnections(currentUser.id)
connection = None
for conn in connections:
if conn.id == connectionId and conn.authority == AuthAuthority.MSFT:
connection = conn
break
if not connection:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Microsoft connection not found"
)
msal_app = msal.ConfidentialClientApplication(
DATA_CLIENT_ID,
authority=AUTHORITY,
client_credential=DATA_CLIENT_SECRET,
)
state_jwt = _issue_oauth_state(
{
"flow": _FLOW_CONNECT,
"connectionId": connectionId,
"userId": str(currentUser.id),
}
)
login_kwargs: Dict[str, Any] = {"prompt": "select_account", "state": state_jwt}
login_hint = connection.externalEmail or connection.externalUsername
if login_hint:
login_kwargs["login_hint"] = login_hint
if "@" in login_hint:
login_kwargs["domain_hint"] = login_hint.split("@", 1)[1]
login_kwargs["prompt"] = "login"
auth_url = msal_app.get_authorization_request_url(
scopes=msftDataScopes,
redirect_uri=DATA_REDIRECT_URI,
**login_kwargs,
)
return RedirectResponse(auth_url)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error initiating Microsoft connect: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to initiate Microsoft connect: {str(e)}",
)
@router.get("/auth/connect/callback")
async def auth_connect_callback(
code: str, state: str, request: Request, response: Response
) -> HTMLResponse:
state_data = _parse_oauth_state(state)
if state_data.get("flow") != _FLOW_CONNECT:
raise HTTPException(status_code=400, detail="Invalid OAuth flow for this callback")
connection_id = state_data.get("connectionId")
user_id = state_data.get("userId")
if not connection_id or not user_id:
raise HTTPException(status_code=400, detail="Missing connection or user in OAuth state")
_require_msft_data_config()
msal_app = msal.ConfidentialClientApplication(
DATA_CLIENT_ID,
authority=AUTHORITY,
client_credential=DATA_CLIENT_SECRET,
)
token_response = msal_app.acquire_token_by_authorization_code(
code,
scopes=msftDataScopes,
redirect_uri=DATA_REDIRECT_URI,
)
if "error" in token_response:
err = token_response.get("error_description", token_response.get("error"))
return HTMLResponse(
content=f"""
<html><body><script>
if (window.opener) {{
window.opener.postMessage({{ type: 'msft_connection_error', error: {json.dumps(err)} }}, '*');
setTimeout(() => window.close(), 1000);
}} else window.close();
</script></body></html>
""",
status_code=400,
)
headers = {
"Authorization": f"Bearer {token_response['access_token']}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient() as client:
user_info_response = await client.get(
"https://graph.microsoft.com/v1.0/me", headers=headers
)
if user_info_response.status_code != 200:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to get user info from Microsoft",
)
user_info = user_info_response.json()
scope_str = token_response.get("scope") or msftDataScopesForRefresh()
granted_list = scope_str.split() if isinstance(scope_str, str) else list(msftDataScopes)
rootInterface = getRootInterface()
user = rootInterface.getUser(user_id)
if not user:
return HTMLResponse(
content="""
<html><body><script>
if (window.opener) {
window.opener.postMessage({ type: 'msft_connection_error', error: 'User not found' }, '*');
setTimeout(() => window.close(), 1000);
} else window.close();
</script></body></html>
""",
status_code=404,
)
interface = getInterface(user)
connections = interface.getUserConnections(user_id)
connection = None
for conn in connections:
if conn.id == connection_id:
connection = conn
break
if not connection:
return HTMLResponse(
content="""
<html><body><script>
if (window.opener) {
window.opener.postMessage({ type: 'msft_connection_error', error: 'Connection not found' }, '*');
setTimeout(() => window.close(), 1000);
} else window.close();
</script></body></html>
""",
status_code=404,
)
try:
connection.status = ConnectionStatus.ACTIVE
connection.lastChecked = getUtcTimestamp()
connection.expiresAt = getUtcTimestamp() + token_response.get("expires_in", 0)
connection.externalId = user_info.get("id")
connection.externalUsername = user_info.get("userPrincipalName")
connection.externalEmail = user_info.get("mail")
connection.grantedScopes = (
granted_list if isinstance(granted_list, list) else list(msftDataScopes)
)
rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump())
token = Token(
userId=user.id,
authority=AuthAuthority.MSFT,
connectionId=connection_id,
tokenPurpose=TokenPurpose.DATA_CONNECTION,
tokenAccess=token_response["access_token"],
tokenRefresh=token_response.get("refresh_token", ""),
tokenType=token_response.get("token_type", "bearer"),
expiresAt=createExpirationTimestamp(token_response.get("expires_in", 0)),
createdAt=getUtcTimestamp(),
)
interface.saveConnectionToken(token)
return HTMLResponse(
content=f"""
<html>
<head><title>Connection Successful</title></head>
<body>
<script>
if (window.opener) {{
window.opener.postMessage({{
type: 'msft_connection_success',
connection: {{
id: '{connection.id}',
status: 'connected',
type: 'msftp',
lastChecked: {getUtcTimestamp()},
expiresAt: {createExpirationTimestamp(token_response.get("expires_in", 0))}
}}
}}, '*');
setTimeout(() => window.close(), 1000);
}} else {{
window.close();
}}
</script>
</body>
</html>
"""
)
except Exception as e:
logger.error(f"Error updating Microsoft connection: {str(e)}", exc_info=True)
return HTMLResponse(
content=f"""
<html><body><script>
if (window.opener) {{
window.opener.postMessage({{ type: 'msft_connection_error', error: {json.dumps(str(e))} }}, '*');
setTimeout(() => window.close(), 1000);
}} else window.close();
</script></body></html>
""",
status_code=500,
)
@router.get("/adminconsent")
@limiter.limit("5/minute")
def adminconsent(request: Request) -> RedirectResponse:
_require_msft_data_config()
admin_consent_redirect = _admin_consent_redirect_uri()
admin_consent_url = (
f"{AUTHORITY}/adminconsent"
f"?client_id={DATA_CLIENT_ID}"
f"&redirect_uri={quote(admin_consent_redirect, safe='')}"
)
logger.info(f"Redirecting to admin consent URL for tenant: {TENANT_ID}")
return RedirectResponse(admin_consent_url)
@router.get("/adminconsent/callback")
def adminconsent_callback(
admin_consent: Optional[str] = Query(None),
tenant: Optional[str] = Query(None),
error: Optional[str] = Query(None),
error_description: Optional[str] = Query(None),
request: Request = None,
) -> HTMLResponse:
"""Handle Microsoft Admin Consent callback"""
try:
if error:
logger.error(f"Admin consent error: {error} - {error_description}")
return HTMLResponse(
content=f"""
<html>
<head><title>Admin Consent Failed</title></head>
<body>
<h1>Admin Consent Failed</h1>
<p>Error: {error}</p>
<p>Description: {error_description or 'No description provided'}</p>
</body>
</html>
""",
status_code=400,
)
if admin_consent == "True" and tenant:
logger.info(f"Admin consent granted for tenant: {tenant}")
return HTMLResponse(
content=f"""
<html>
<head><title>Admin Consent Successful</title></head>
<body>
<h1>Admin Consent Successful</h1>
<p>Tenant: <strong>{tenant}</strong></p>
<script>setTimeout(() => window.close(), 3000);</script>
</body>
</html>
"""
)
return HTMLResponse(
content=f"""
<html><body>
<h1>Admin Consent Status</h1>
<p>Admin Consent: {admin_consent or 'Not provided'}</p>
<p>Tenant: {tenant or 'Not provided'}</p>
</body></html>
"""
)
except Exception as e:
logger.error(f"Error in admin consent callback: {str(e)}", exc_info=True)
return HTMLResponse(
content=f"<html><body><h1>Error</h1><p>{str(e)}</p></body></html>",
status_code=500,
)
@router.get("/me", response_model=User)
@limiter.limit("30/minute")
def get_current_user(
request: Request,
currentUser: User = Depends(getCurrentUser),
) -> User:
return currentUser
@router.post("/logout")
@limiter.limit("10/minute")
def logout(
request: Request,
currentUser: User = Depends(getCurrentUser),
) -> JSONResponse:
"""
End only the PowerOn gateway session (JWT + DB token row). Does not sign the user out of Microsoft
in the browser (no AAD logout redirect).
"""
try:
appInterface = getInterface(currentUser)
token = request.cookies.get("auth_token")
if not token:
auth_header = request.headers.get("Authorization")
if auth_header and auth_header.lower().startswith("bearer "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No token found",
)
try:
payload = jose_jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
session_id = payload.get("sid") or payload.get("sessionId")
jti = payload.get("jti")
except Exception as e:
logger.error(f"Failed to decode JWT on Microsoft logout: {str(e)}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token",
)
revoked = 0
if session_id:
revoked = appInterface.revokeTokensBySessionId(
session_id,
currentUser.id,
AuthAuthority.MSFT,
revokedBy=currentUser.id,
reason="logout",
)
elif jti:
appInterface.revokeTokenById(jti, revokedBy=currentUser.id, reason="logout")
revoked = 1
try:
from modules.shared.auditLogger import audit_logger
audit_logger.logUserAccess(
userId=str(currentUser.id),
mandateId="system",
action="logout",
successInfo=f"microsoft_gateway_logout revoked={revoked}",
ipAddress=request.client.host if request.client else None,
userAgent=request.headers.get("user-agent"),
success=True,
)
except Exception:
pass
json_response = JSONResponse(
{
"message": "Successfully logged out from application (Microsoft account stays signed in elsewhere)",
"revokedTokens": revoked,
}
)
clearAccessTokenCookie(json_response)
clearRefreshTokenCookie(json_response)
return json_response
except HTTPException:
raise
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to logout: {str(e)}",
)
@router.post("/cleanup")
@limiter.limit("5/minute")
def cleanup_expired_tokens(
request: Request,
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
try:
appInterface = getInterface(currentUser)
cleaned_count = appInterface.cleanupExpiredTokens()
return {"message": "Cleanup completed successfully", "tokens_cleaned": cleaned_count}
except Exception as e:
logger.error(f"Error cleaning up expired tokens: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to cleanup expired tokens: {str(e)}",
)
@router.post("/refresh")
@limiter.limit("10/minute")
async def refresh_token(
request: Request,
currentUser: User = Depends(getCurrentUser),
) -> Dict[str, Any]:
try:
appInterface = getInterface(currentUser)
payload = {}
try:
payload = await request.json()
except Exception:
payload = {}
requested_connection_id = (
payload.get("connectionId") if isinstance(payload, dict) else None
)
connections = appInterface.getUserConnections(currentUser.id)
msft_connection = None
if requested_connection_id:
for conn in connections:
if conn.id == requested_connection_id and conn.authority == AuthAuthority.MSFT:
msft_connection = conn
break
if not msft_connection:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Requested Microsoft connection not found for current user",
)
else:
for conn in connections:
if conn.authority == AuthAuthority.MSFT:
msft_connection = conn
break
if not msft_connection:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No Microsoft connection found for current user",
)
current_token = TokenManager().getFreshToken(msft_connection.id)
if not current_token:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No Microsoft token found for this connection",
)
token_manager = TokenManager()
refreshed_token = token_manager.refreshToken(current_token)
if refreshed_token:
appInterface.saveConnectionToken(refreshed_token)
expires_at_val = parseTimestamp(refreshed_token.expiresAt)
if expires_at_val:
msft_connection.expiresAt = expires_at_val
msft_connection.lastChecked = getUtcTimestamp()
msft_connection.status = ConnectionStatus.ACTIVE
appInterface.db.recordModify(
UserConnection, msft_connection.id, msft_connection.model_dump()
)
current_time = getUtcTimestamp()
expires_at = parseTimestamp(refreshed_token.expiresAt)
expires_in = int(expires_at - current_time) if expires_at else 0
return {
"message": "Token refreshed successfully",
"expires_at": expires_at,
"expires_in_seconds": expires_in,
}
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to refresh token",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error refreshing Microsoft token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to refresh token: {str(e)}",
)