gateway/modules/routes/routeSecurityMsft.py
2025-05-28 01:51:10 +02:00

166 lines
5.5 KiB
Python

"""
Routes for Microsoft authentication.
"""
from fastapi import APIRouter, HTTPException, Request, Response, status, Depends, Body
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import logging
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import msal
from modules.shared.configuration import APP_CONFIG
from modules.interfaces.serviceAppClass import getInterface, getRootInterface
from modules.interfaces.serviceAppModel import AuthAuthority, User
from modules.interfaces.serviceAppTokens import MsftToken
from modules.security.auth import getCurrentUser, limiter
# Configure logger
logger = logging.getLogger(__name__)
# Create router
router = APIRouter(
prefix="/api/msft",
tags=["Security Microsoft"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
# Microsoft OAuth configuration
CLIENT_ID = APP_CONFIG.get("Service_MSFT_CLIENT_ID")
CLIENT_SECRET = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET")
TENANT_ID = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common")
REDIRECT_URI = APP_CONFIG.get("Service_MSFT_REDIRECT_URI")
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPES = ["Mail.ReadWrite", "User.Read"]
@router.get("/login")
@limiter.limit("5/minute")
async def login(request: Request) -> RedirectResponse:
"""Initiate Microsoft login"""
try:
# Create MSAL app
msal_app = msal.ConfidentialClientApplication(
CLIENT_ID,
authority=AUTHORITY,
client_credential=CLIENT_SECRET
)
# Generate auth URL
auth_url = msal_app.get_authorization_request_url(
scopes=SCOPES,
redirect_uri=REDIRECT_URI
)
return RedirectResponse(auth_url)
except Exception as e:
logger.error(f"Error initiating Microsoft login: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to initiate Microsoft login: {str(e)}"
)
@router.get("/auth/callback")
async def auth_callback(code: str, request: Request) -> HTMLResponse:
"""Handle Microsoft OAuth callback"""
try:
# Create MSAL app
msal_app = msal.ConfidentialClientApplication(
CLIENT_ID,
authority=AUTHORITY,
client_credential=CLIENT_SECRET
)
# Get token from code
token_response = msal_app.acquire_token_by_authorization_code(
code,
scopes=SCOPES,
redirect_uri=REDIRECT_URI
)
if "error" in token_response:
return HTMLResponse(
content="<html><body><h1>Authentication Failed</h1><p>Could not acquire token.</p></body></html>",
status_code=400
)
# Create token data
token_data = {
"access_token": token_response["access_token"],
"refresh_token": token_response.get("refresh_token", ""),
"token_type": token_response.get("token_type", "bearer"),
"expires_at": datetime.now().timestamp() + token_response.get("expires_in", 0)
}
# Save token data
appInterface = getInterface()
appInterface.saveToken("Msft", token_data)
# Return success page with token data
return HTMLResponse(
content=f"""
<html>
<head><title>Authentication Successful</title></head>
<body>
<script>
if (window.opener) {{
window.opener.postMessage({{
type: 'msft_auth_success',
access_token: {json.dumps(token_response["access_token"])},
token_data: {json.dumps(token_data)}
}}, '*');
}}
setTimeout(() => window.close(), 1000);
</script>
</body>
</html>
"""
)
except Exception as e:
logger.error(f"Error in auth callback: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Authentication failed: {str(e)}"
)
@router.get("/me", response_model=User)
@limiter.limit("30/minute")
async def get_current_user(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> User:
"""Get current user information"""
try:
return currentUser
except Exception as e:
logger.error(f"Error getting current user: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to get current user: {str(e)}"
)
@router.post("/logout")
@limiter.limit("10/minute")
async def logout(
request: Request,
currentUser: User = Depends(getCurrentUser)
) -> Dict[str, Any]:
"""Logout current user"""
try:
appInterface = getInterface(currentUser)
appInterface.logout()
return {"message": "Logged out successfully"}
except Exception as e:
logger.error(f"Error during logout: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to logout: {str(e)}"
)