""" Routes for Microsoft authentication. """ from fastapi import APIRouter, HTTPException, Request, Response, status, Depends, Body, Query from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse import logging import json from typing import Dict, Any, Optional from datetime import datetime, timedelta import msal from modules.shared.configuration import APP_CONFIG from modules.interfaces.serviceAppClass import getInterface, getRootInterface from modules.interfaces.serviceAppModel import AuthAuthority, User, Token, ConnectionStatus, UserConnection from modules.security.auth import getCurrentUser, limiter from modules.shared.attributeUtils import ModelMixin # Configure logger logger = logging.getLogger(__name__) # Create router router = APIRouter( prefix="/api/msft", tags=["Security Microsoft"], responses={ 404: {"description": "Not found"}, 400: {"description": "Bad request"}, 401: {"description": "Unauthorized"}, 403: {"description": "Forbidden"}, 500: {"description": "Internal server error"} } ) # Microsoft OAuth configuration CLIENT_ID = APP_CONFIG.get("Service_MSFT_CLIENT_ID") CLIENT_SECRET = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET") TENANT_ID = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common") REDIRECT_URI = APP_CONFIG.get("Service_MSFT_REDIRECT_URI") AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" SCOPES = ["Mail.ReadWrite", "User.Read"] @router.get("/login") @limiter.limit("5/minute") async def login( request: Request, state: str = Query("login", description="State parameter to distinguish between login and connection flows"), connectionId: Optional[str] = Query(None, description="Connection ID for connection flow") ) -> RedirectResponse: """Initiate Microsoft login""" try: # Create MSAL app msal_app = msal.ConfidentialClientApplication( CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET ) # Generate auth URL with state auth_url = msal_app.get_authorization_request_url( scopes=SCOPES, redirect_uri=REDIRECT_URI, state=json.dumps({ "type": state, "connectionId": connectionId }) ) return RedirectResponse(auth_url) except Exception as e: logger.error(f"Error initiating Microsoft login: {str(e)}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to initiate Microsoft login: {str(e)}" ) @router.get("/auth/callback") async def auth_callback(code: str, state: str, request: Request) -> HTMLResponse: """Handle Microsoft OAuth callback""" try: # Parse state state_data = json.loads(state) state_type = state_data.get("type", "login") connection_id = state_data.get("connectionId") # Create MSAL app msal_app = msal.ConfidentialClientApplication( CLIENT_ID, authority=AUTHORITY, client_credential=CLIENT_SECRET ) # Get token from code token_response = msal_app.acquire_token_by_authorization_code( code, scopes=SCOPES, redirect_uri=REDIRECT_URI ) if "error" in token_response: return HTMLResponse( content="
Could not acquire token.
", status_code=400 ) # Get user info from Microsoft user_info = msal_app.acquire_token_for_client(scopes=["User.Read"]) if "error" in user_info: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to get user info from Microsoft" ) if state_type == "login": # Handle login flow rootInterface = getRootInterface() user = rootInterface.getUserByUsername(user_info.get("preferred_username")) if not user: # Create new user if doesn't exist user = rootInterface.createUser( username=user_info.get("preferred_username"), email=user_info.get("email"), fullName=user_info.get("name"), authenticationAuthority=AuthAuthority.MSFT, externalId=user_info.get("id"), externalUsername=user_info.get("preferred_username"), externalEmail=user_info.get("email") ) # Create token token = Token( userId=user.id, authority=AuthAuthority.MSFT, tokenAccess=token_response["access_token"], tokenRefresh=token_response.get("refresh_token", ""), tokenType=token_response.get("token_type", "bearer"), expiresAt=datetime.now().timestamp() + token_response.get("expires_in", 0) ) # Save token appInterface = getInterface(user) appInterface.saveToken(token) # Return success page with token data return HTMLResponse( content=f"""