154 lines
5.1 KiB
Python
154 lines
5.1 KiB
Python
"""
|
|
Routes for Microsoft authentication.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Request, Response, status, Depends
|
|
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
|
|
import logging
|
|
import json
|
|
from typing import Dict, Any, Optional
|
|
from datetime import datetime, timedelta
|
|
import msal
|
|
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.interfaces.serviceAppClass import getInterface
|
|
from modules.interfaces.serviceAppModel import AuthAuthority
|
|
from modules.interfaces.serviceAppTokens import MsftToken, saveToken
|
|
from modules.security.auth import getCurrentUser, limiter
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create router
|
|
router = APIRouter(
|
|
prefix="/api/msft",
|
|
tags=["Security Microsoft"],
|
|
responses={
|
|
404: {"description": "Not found"},
|
|
400: {"description": "Bad request"},
|
|
401: {"description": "Unauthorized"},
|
|
403: {"description": "Forbidden"},
|
|
500: {"description": "Internal server error"}
|
|
}
|
|
)
|
|
|
|
# Microsoft OAuth configuration
|
|
CLIENT_ID = APP_CONFIG.get("Service_MSFT_CLIENT_ID")
|
|
CLIENT_SECRET = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET")
|
|
TENANT_ID = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common")
|
|
REDIRECT_URI = APP_CONFIG.get("Service_MSFT_REDIRECT_URI")
|
|
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
|
|
SCOPES = ["Mail.ReadWrite", "User.Read"]
|
|
|
|
@router.get("/login")
|
|
@limiter.limit("5/minute")
|
|
async def login():
|
|
"""Initiate Microsoft login"""
|
|
try:
|
|
# Create MSAL app
|
|
msal_app = msal.ConfidentialClientApplication(
|
|
CLIENT_ID,
|
|
authority=AUTHORITY,
|
|
client_credential=CLIENT_SECRET
|
|
)
|
|
|
|
# Generate auth URL
|
|
auth_url = msal_app.get_authorization_request_url(
|
|
scopes=SCOPES,
|
|
redirect_uri=REDIRECT_URI
|
|
)
|
|
|
|
return RedirectResponse(auth_url)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initiating Microsoft login: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to initiate Microsoft login: {str(e)}"
|
|
)
|
|
|
|
@router.get("/auth/callback")
|
|
async def auth_callback(code: str, request: Request):
|
|
"""Handle Microsoft OAuth callback"""
|
|
try:
|
|
# Create MSAL app
|
|
msal_app = msal.ConfidentialClientApplication(
|
|
CLIENT_ID,
|
|
authority=AUTHORITY,
|
|
client_credential=CLIENT_SECRET
|
|
)
|
|
|
|
# Get token from code
|
|
token_response = msal_app.acquire_token_by_authorization_code(
|
|
code,
|
|
scopes=SCOPES,
|
|
redirect_uri=REDIRECT_URI
|
|
)
|
|
|
|
if "error" in token_response:
|
|
return HTMLResponse(
|
|
content="<html><body><h1>Authentication Failed</h1><p>Could not acquire token.</p></body></html>",
|
|
status_code=400
|
|
)
|
|
|
|
# Create token data
|
|
token_data = {
|
|
"access_token": token_response["access_token"],
|
|
"refresh_token": token_response.get("refresh_token", ""),
|
|
"token_type": token_response.get("token_type", "bearer"),
|
|
"expires_at": datetime.now().timestamp() + token_response.get("expires_in", 0)
|
|
}
|
|
|
|
# Save token data
|
|
appInterface = getInterface()
|
|
saveToken(appInterface, "Msft", token_data)
|
|
|
|
# Return success page with token data
|
|
return HTMLResponse(
|
|
content=f"""
|
|
<html>
|
|
<head><title>Authentication Successful</title></head>
|
|
<body>
|
|
<script>
|
|
if (window.opener) {{
|
|
window.opener.postMessage({{
|
|
type: 'msft_auth_success',
|
|
access_token: {json.dumps(token_response["access_token"])},
|
|
token_data: {json.dumps(token_data)}
|
|
}}, '*');
|
|
}}
|
|
setTimeout(() => window.close(), 1000);
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in auth callback: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Authentication failed: {str(e)}"
|
|
)
|
|
|
|
@router.post("/logout")
|
|
@limiter.limit("30/minute")
|
|
async def logout(currentUser: Dict[str, Any] = Depends(getCurrentUser)):
|
|
"""Logout from Microsoft"""
|
|
try:
|
|
# Get user interface
|
|
appInterface = getInterface()
|
|
|
|
# Revoke all sessions for the user
|
|
appInterface.revokeAllUserSessions(currentUser.get("id"))
|
|
|
|
return JSONResponse({
|
|
"message": "Successfully logged out from Microsoft"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during logout: {str(e)}")
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Logout failed: {str(e)}"
|
|
)
|