# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Routes for Microsoft authentication. """ from fastapi import APIRouter, HTTPException, Request, Response, status, Depends, Body, Query from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import json from typing import Dict, Any, Optional import msal import httpx from modules.shared.configuration import APP_CONFIG from modules.interfaces.interfaceDbAppObjects import getInterface, getRootInterface from modules.datamodels.datamodelUam import AuthAuthority, User, ConnectionStatus, UserConnection from modules.datamodels.datamodelSecurity import Token from modules.auth import getCurrentUser, limiter from modules.auth import createAccessToken, setAccessTokenCookie, createRefreshToken, setRefreshTokenCookie from modules.shared.timeUtils import createExpirationTimestamp, getUtcTimestamp, parseTimestamp # Configure logger logger = logging.getLogger(__name__) # Create router router = APIRouter( prefix="/api/msft", tags=["Security Microsoft"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) # Microsoft OAuth configuration CLIENT_ID = APP_CONFIG.get("Service_MSFT_CLIENT_ID") CLIENT_SECRET = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET") TENANT_ID = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common") REDIRECT_URI = APP_CONFIG.get("Service_MSFT_REDIRECT_URI") AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" # Validate configuration at module load if not CLIENT_ID: logger.warning("Service_MSFT_CLIENT_ID is not configured") if not CLIENT_SECRET: logger.warning("Service_MSFT_CLIENT_SECRET is not configured") if not REDIRECT_URI: logger.warning("Service_MSFT_REDIRECT_URI is not configured") if CLIENT_SECRET and CLIENT_SECRET.startswith(("PROD_ENC:", "INT_ENC:", "DEV_ENC:")): logger.warning("Service_MSFT_CLIENT_SECRET appears to be encrypted - ensure decryption is working") SCOPES = [ "Mail.ReadWrite", # Read and write mail (user's mailbox only) "Mail.Send", # Send mail (user's mailbox only) "User.Read", # Read user profile "Sites.ReadWrite", # Read and write user's SharePoint sites (not org-wide) "Files.ReadWrite" # Read and write user's files (not org-wide) ] @router.get("/login") @limiter.limit("5/minute") async def login( request: Request, state: str = Query("login", description="State parameter to distinguish between login and connection flows"), connectionId: Optional[str] = Query(None, description="Connection ID for connection flow") ) -> RedirectResponse: """Initiate Microsoft login""" try: # Create MSAL app msal_app = msal.ConfidentialClientApplication( CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET ) # Generate auth URL with state - use state as is if it's already JSON, otherwise create new state try: # Try to parse state as JSON to check if it's already encoded json.loads(state) state_param = state # Use state as is if it's valid JSON except json.JSONDecodeError: # If not JSON, create new state object state_param = json.dumps({ "type": state, "connectionId": connectionId }) # If a specific connection is targeted, set login_hint/domain_hint to preselect that account login_kwargs = {} if connectionId: try: rootInterface = getRootInterface() # Fetch the connection by ID directly records = rootInterface.db.getRecordset(UserConnection, recordFilter={"id": connectionId}) if records: record = records[0] login_hint = record.get("externalEmail") or record.get("externalUsername") if login_hint: login_kwargs["login_hint"] = login_hint # Derive domain hint from email/UPN if "@" in login_hint: domain = login_hint.split("@", 1)[1] # Use common MSAL guidance: pass domain_hint to reduce account switching login_kwargs["domain_hint"] = domain # When targeting a specific account, avoid account picker login_kwargs["prompt"] = "login" # force re-auth for that account only else: # Fall back to default behavior if connection not found login_kwargs["prompt"] = "select_account" except Exception: login_kwargs["prompt"] = "select_account" else: # Generic login/connect flow: allow choosing account login_kwargs["prompt"] = "select_account" # MSAL automatically adds openid, profile, offline_access - we just need to provide our business scopes auth_url = msal_app.get_authorization_request_url( scopes=SCOPES, # Only our business scopes - MSAL adds the required ones automatically redirect_uri=REDIRECT_URI, state=state_param, **login_kwargs ) return RedirectResponse(auth_url) except Exception as e: logger.error(f"Error initiating Microsoft login: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initiate Microsoft login: {str(e)}" ) @router.get("/adminconsent/callback") async def adminconsent_callback( admin_consent: Optional[str] = Query(None), tenant: Optional[str] = Query(None), error: Optional[str] = Query(None), error_description: Optional[str] = Query(None), request: Request = None ) -> HTMLResponse: """Handle Microsoft Admin Consent callback""" try: if error: logger.error(f"Admin consent error: {error} - {error_description}") return HTMLResponse( content=f""" Admin Consent Failed

Admin Consent Failed

Error: {error}

Description: {error_description or 'No description provided'}

Please contact your administrator.

""", status_code=400 ) if admin_consent == "True" and tenant: logger.info(f"Admin consent granted for tenant: {tenant}") return HTMLResponse( content=f""" Admin Consent Successful

Admin Consent Successful

The application has been granted admin consent for tenant: {tenant}

All users in this tenant can now use the application without individual consent.

You can close this window.

""" ) else: logger.warning(f"Admin consent callback received unexpected parameters: admin_consent={admin_consent}, tenant={tenant}") return HTMLResponse( content=f""" Admin Consent Status

Admin Consent Status

Admin Consent: {admin_consent or 'Not provided'}

Tenant: {tenant or 'Not provided'}

""" ) except Exception as e: logger.error(f"Error in admin consent callback: {str(e)}", exc_info=True) return HTMLResponse( content=f""" Admin Consent Error

Error Processing Admin Consent

{str(e)}

""", status_code=500 ) @router.get("/auth/callback") async def auth_callback(code: str, state: str, request: Request, response: Response) -> HTMLResponse: """Handle Microsoft OAuth callback""" try: # Parse state state_data = json.loads(state) state_type = state_data.get("type", "login") connection_id = state_data.get("connectionId") user_id = state_data.get("userId") # Get user ID from state logger.info(f"Processing Microsoft auth callback: state_type={state_type}, connection_id={connection_id}, user_id={user_id}") # Create MSAL app msal_app = msal.ConfidentialClientApplication( CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET ) # Get token from code - MSAL automatically handles the required scopes token_response = msal_app.acquire_token_by_authorization_code( code, scopes=SCOPES, # Only our business scopes - MSAL adds the required ones automatically redirect_uri=REDIRECT_URI ) if "error" in token_response: error_code = token_response.get('error') error_description = token_response.get('error_description', 'No description provided') error_uri = token_response.get('error_uri', '') logger.error( f"Token acquisition failed: {error_code} - {error_description} | " f"CLIENT_ID: {CLIENT_ID[:8]}... | " f"REDIRECT_URI: {REDIRECT_URI} | " f"TENANT_ID: {TENANT_ID}" ) # Provide more helpful error message based on error code if error_code == "invalid_client": error_msg = "Invalid client credentials. Please check CLIENT_ID and CLIENT_SECRET configuration." elif error_code == "invalid_grant": error_msg = "Invalid authorization code or redirect URI mismatch." else: error_msg = f"Authentication failed: {error_description or error_code}" return HTMLResponse( content=f""" Authentication Failed

Authentication Failed

{error_msg}

Error code: {error_code}

Please contact support if this issue persists.

""", status_code=400 ) # Get user info using the access token headers = { 'Authorization': f"Bearer {token_response['access_token']}", 'Content-Type': 'application/json' } async with httpx.AsyncClient() as client: user_info_response = await client.get( "https://graph.microsoft.com/v1.0/me", headers=headers ) if user_info_response.status_code != 200: logger.error(f"Failed to get user info: {user_info_response.text}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to get user info from Microsoft" ) user_info = user_info_response.json() logger.info(f"Got user info from Microsoft: {user_info.get('userPrincipalName')}") if state_type == "login": # Handle login flow rootInterface = getRootInterface() user = rootInterface.getUserByUsername(user_info.get("userPrincipalName")) if not user: logger.info(f"Creating new user for {user_info.get('userPrincipalName')}") # Create new user if doesn't exist user = rootInterface.createUser( username=user_info.get("userPrincipalName"), email=user_info.get("mail"), fullName=user_info.get("displayName"), authenticationAuthority=AuthAuthority.MSFT, externalId=user_info.get("id"), externalUsername=user_info.get("userPrincipalName"), externalEmail=user_info.get("mail") ) # Create token token = Token( userId=user.id, # Use local user's ID authority=AuthAuthority.MSFT, tokenAccess=token_response["access_token"], tokenRefresh=token_response.get("refresh_token", ""), tokenType=token_response.get("token_type", "bearer"), expiresAt=createExpirationTimestamp(token_response.get("expires_in", 0)), createdAt=getUtcTimestamp() ) # Save access token (no connectionId) appInterface = getInterface(user) appInterface.saveAccessToken(token) # Create JWT token data jwt_token_data = { "sub": user.username, "mandateId": str(user.mandateId), "userId": str(user.id), "authenticationAuthority": AuthAuthority.MSFT.value } # Create JWT access token jwt_token, jwt_expires_at = createAccessToken(jwt_token_data) # Create refresh token refresh_token, _refresh_expires = createRefreshToken(jwt_token_data) # Decode token to get jti for database record from jose import jwt from modules.auth import SECRET_KEY, ALGORITHM payload = jwt.decode(jwt_token, SECRET_KEY, algorithms=[ALGORITHM]) jti = payload.get("jti") # Create JWT token with matching id jwt_token_obj = Token( id=jti, userId=user.id, authority=AuthAuthority.MSFT, tokenAccess=jwt_token, tokenType="bearer", expiresAt=jwt_expires_at.timestamp(), createdAt=getUtcTimestamp(), mandateId=str(user.mandateId) ) # Save JWT access token appInterface.saveAccessToken(jwt_token_obj) # Convert token to dict and ensure proper timestamp handling token_dict = jwt_token_obj.model_dump() # Remove datetime conversion logic - models now handle this automatically # The token model already returns float timestamps # Create HTML response html_response = HTMLResponse( content=f""" Authentication Successful """ ) # Set access token as httpOnly cookie (like local login) # HTMLResponse inherits from Response, so we can set cookies directly on it setAccessTokenCookie(html_response, jwt_token, expiresDelta=None) # Set refresh token as httpOnly cookie setRefreshTokenCookie(html_response, refresh_token) return html_response else: # Handle connection flow if not connection_id or not user_id: logger.error("Connection ID or User ID is missing in connection flow") raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Connection ID and User ID are required for connection flow" ) # Get user directly by ID rootInterface = getRootInterface() user = rootInterface.getUser(user_id) if not user: logger.error(f"User {user_id} not found in database") return HTMLResponse( content=f""" Connection Failed """, status_code=404 ) # Get the connection from the connections table interface = getInterface(user) connections = interface.getUserConnections(user_id) connection = None for conn in connections: if conn.id == connection_id: connection = conn logger.info(f"Found existing connection for user {user.username}") break try: if not connection: logger.error(f"Connection {connection_id} not found in user's connections") return HTMLResponse( content=f""" Connection Failed """, status_code=404 ) logger.info(f"Updating connection {connection_id} for user {user.username}") # Update connection with external service details connection.status = ConnectionStatus.ACTIVE connection.lastChecked = getUtcTimestamp() connection.expiresAt = getUtcTimestamp() + token_response.get("expires_in", 0) connection.externalId = user_info.get("id") connection.externalUsername = user_info.get("userPrincipalName") connection.externalEmail = user_info.get("mail") # Update connection record directly rootInterface.db.recordModify(UserConnection, connection_id, connection.model_dump()) # Save token token = Token( userId=user.id, # Use local user's ID authority=AuthAuthority.MSFT, connectionId=connection_id, # Link token to this specific connection tokenAccess=token_response["access_token"], tokenRefresh=token_response.get("refresh_token", ""), tokenType=token_response.get("token_type", "bearer"), expiresAt=createExpirationTimestamp(token_response.get("expires_in", 0)), createdAt=getUtcTimestamp() ) interface.saveConnectionToken(token) # Return success page with connection data return HTMLResponse( content=f""" Connection Successful """ ) except Exception as e: logger.error(f"Error updating connection or saving token: {str(e)}", exc_info=True) return HTMLResponse( content=f""" Connection Failed """, status_code=500 ) except Exception as e: logger.error(f"Error in auth callback: {str(e)}", exc_info=True) return HTMLResponse( content=f""" Authentication Failed """, status_code=500 ) @router.get("/me", response_model=User) @limiter.limit("30/minute") async def get_current_user( request: Request, currentUser: User = Depends(getCurrentUser) ) -> User: """Get current user information""" try: return currentUser except Exception as e: logger.error(f"Error getting current user: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to get current user: {str(e)}" ) @router.post("/logout") @limiter.limit("10/minute") async def logout( request: Request, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """Logout current user""" try: appInterface = getInterface(currentUser) appInterface.logout() # Log successful logout try: from modules.shared.auditLogger import audit_logger audit_logger.logUserAccess( userId=str(currentUser.id), mandateId=str(currentUser.mandateId), action="logout", successInfo="microsoft_auth_logout" ) except Exception: # Don't fail if audit logging fails pass return {"message": "Logged out successfully"} except Exception as e: logger.error(f"Error during logout: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to logout: {str(e)}" ) @router.post("/cleanup") @limiter.limit("5/minute") async def cleanup_expired_tokens( request: Request, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """Clean up expired tokens for the current user""" try: appInterface = getInterface(currentUser) # Clean up expired tokens cleaned_count = appInterface.cleanupExpiredTokens() return { "message": f"Cleanup completed successfully", "tokens_cleaned": cleaned_count } except Exception as e: logger.error(f"Error cleaning up expired tokens: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to cleanup expired tokens: {str(e)}" ) @router.post("/refresh") @limiter.limit("10/minute") async def refresh_token( request: Request, currentUser: User = Depends(getCurrentUser) ) -> Dict[str, Any]: """Refresh Microsoft OAuth token for current user""" try: appInterface = getInterface(currentUser) # Optional: use provided connectionId to target a specific connection payload = {} try: payload = await request.json() except Exception: payload = {} requested_connection_id = payload.get("connectionId") if isinstance(payload, dict) else None # Find Microsoft connection for this user logger.debug(f"Looking for Microsoft connection for user {currentUser.id}") connections = appInterface.getUserConnections(currentUser.id) msft_connection = None if requested_connection_id: # Validate the requested connection belongs to current user and is MSFT for conn in connections: if conn.id == requested_connection_id and conn.authority == AuthAuthority.MSFT: msft_connection = conn break if not msft_connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Requested Microsoft connection not found for current user" ) else: # Fallback: first MSFT connection for conn in connections: if conn.authority == AuthAuthority.MSFT: msft_connection = conn break if not msft_connection: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No Microsoft connection found for current user" ) logger.debug(f"Found Microsoft connection: {msft_connection.id}, status={msft_connection.status}") # Get a fresh token via TokenManager convenience method from modules.auth import TokenManager current_token = TokenManager().getFreshToken(msft_connection.id) if not current_token: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="No Microsoft token found for this connection" ) # Always attempt refresh (as per your requirement) from modules.auth import TokenManager token_manager = TokenManager() refreshedToken = token_manager.refreshToken(current_token) if refreshedToken: # Save the new connection token (which will automatically replace old ones) appInterface.saveConnectionToken(refreshedToken) # Update the connection's expiration time expiresAtValue = parseTimestamp(refreshedToken.expiresAt) if expiresAtValue: msft_connection.expiresAt = expiresAtValue msft_connection.lastChecked = getUtcTimestamp() msft_connection.status = ConnectionStatus.ACTIVE # Save updated connection appInterface.db.recordModify(UserConnection, msft_connection.id, msft_connection.model_dump()) # Calculate time until expiration currentTime = getUtcTimestamp() expiresAt = parseTimestamp(refreshedToken.expiresAt) expiresIn = int(expiresAt - currentTime) if expiresAt else 0 return { "message": "Token refreshed successfully", "expires_at": expiresAt, "expires_in_seconds": expiresIn } else: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to refresh token" ) except HTTPException: raise except Exception as e: logger.error(f"Error refreshing Microsoft token: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to refresh token: {str(e)}" )