gateway/routes/routeMsft.py
2025-05-07 02:08:09 +02:00

405 lines
15 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Request, Response, status, Cookie
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
import msal
import os
import logging
import sys
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from modules.auth import getCurrentActiveUser, getUserContext
from modules.configuration import APP_CONFIG
from modules.lucydomInterface import getLucydomInterface
# Configure logger
logger = logging.getLogger(__name__)
# Create router for Microsoft Auth endpoints
router = APIRouter(
prefix="/api/msft",
tags=["Microsoft"],
responses={
404: {"description": "Not found"},
400: {"description": "Bad request"},
401: {"description": "Unauthorized"},
403: {"description": "Forbidden"},
500: {"description": "Internal server error"}
}
)
# Azure AD configuration - load from config
CLIENT_ID = APP_CONFIG.get("Agent_Mail_MSFT_CLIENT_ID")
CLIENT_SECRET = APP_CONFIG.get("Agent_Mail_MSFT_CLIENT_SECRET")
TENANT_ID = APP_CONFIG.get("Agent_Mail_MSFT_TENANT_ID", "common") # Use 'common' for multi-tenant
AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}"
SCOPES = ["Mail.ReadWrite", "User.Read"]
REDIRECT_URI = APP_CONFIG.get("Agent_Mail_MSFT_REDIRECT_URI")
# Initialize MSAL application
app_config = {
"client_id": CLIENT_ID,
"client_credential": CLIENT_SECRET,
"authority": AUTHORITY,
"redirect_uri": REDIRECT_URI
}
# Create a simple file-based token storage
TOKEN_DIR = './token_storage'
if not os.path.exists(TOKEN_DIR):
os.makedirs(TOKEN_DIR)
logger.info(f"Created token storage directory: {TOKEN_DIR}")
def save_token_to_file(user_id: str, token_data: Dict[str, Any]):
"""Save token data to a file"""
filename = os.path.join(TOKEN_DIR, f"{user_id}.json")
with open(filename, 'w') as f:
json.dump(token_data, f)
logger.info(f"Token saved for user: {user_id}")
def load_token_from_file(user_id: str) -> Optional[Dict[str, Any]]:
"""Load token data from a file"""
filename = os.path.join(TOKEN_DIR, f"{user_id}.json")
if os.path.exists(filename):
with open(filename, 'r') as f:
return json.load(f)
return None
def get_user_info_from_token(access_token: str) -> Optional[Dict[str, Any]]:
"""Get user information using the access token"""
import requests
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
try:
response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers)
if response.status_code == 200:
user_data = response.json()
return {
"name": user_data.get("displayName", ""),
"email": user_data.get("userPrincipalName", ""),
"id": user_data.get("id", "")
}
else:
logger.error(f"Error getting user info: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Exception getting user info: {str(e)}")
return None
def verify_token(token: str) -> bool:
"""Verify the access token is valid"""
import requests
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
try:
logger.info("Verifying token validity...")
response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers)
if response.status_code == 200:
logger.info("Token verification successful")
return True
else:
logger.error(f"Token verification failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Exception verifying token: {str(e)}")
return False
def refresh_token(user_id: str) -> bool:
"""Refresh the access token using the stored refresh token"""
token_data = load_token_from_file(user_id)
if not token_data or not token_data.get("refresh_token"):
logger.warning("No refresh token available")
return False
msal_app = msal.ConfidentialClientApplication(
app_config["client_id"],
authority=app_config["authority"],
client_credential=app_config["client_credential"]
)
result = msal_app.acquire_token_by_refresh_token(
token_data["refresh_token"],
scopes=SCOPES
)
if "error" in result:
logger.error(f"Error refreshing token: {result.get('error')}")
return False
# Update tokens in storage
token_data["access_token"] = result["access_token"]
if "refresh_token" in result:
token_data["refresh_token"] = result["refresh_token"]
save_token_to_file(user_id, token_data)
logger.info("Access token refreshed successfully")
return True
def silent_login(user_id: str) -> bool:
"""Try to silently log in a user using their refresh token"""
token_data = load_token_from_file(user_id)
if not token_data or not token_data.get("refresh_token"):
logger.info(f"No refresh token found for user: {user_id}")
return False
# Try to refresh the token
msal_app = msal.ConfidentialClientApplication(
app_config["client_id"],
authority=app_config["authority"],
client_credential=app_config["client_credential"]
)
result = msal_app.acquire_token_by_refresh_token(
token_data["refresh_token"],
scopes=SCOPES
)
if "error" in result:
logger.error(f"Error refreshing token: {result.get('error')}")
return False
# Update tokens in storage
token_data["access_token"] = result["access_token"]
if "refresh_token" in result:
token_data["refresh_token"] = result["refresh_token"]
save_token_to_file(user_id, token_data)
return True
@router.get("/login")
async def login():
# Modified implementation without requiring current user
try:
# Create a confidential client application
msal_app = msal.ConfidentialClientApplication(
app_config["client_id"],
authority=app_config["authority"],
client_credential=app_config["client_credential"]
)
# Build the auth URL
auth_url = msal_app.get_authorization_request_url(
SCOPES,
state="anonymous-user", # Use a general state since we don't have user context
redirect_uri=app_config["redirect_uri"]
)
logger.info(f"Redirecting to Microsoft login: {auth_url[:60]}...")
return RedirectResponse(auth_url)
except Exception as e:
logger.error(f"Error initiating Microsoft login: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error initiating Microsoft login: {str(e)}"
)
@router.get("/auth/callback")
async def auth_callback(request: Request, code: str = None, state: str = None):
"""Handle callback from Microsoft login"""
try:
# Log callback for debugging
logger.info("Received callback from Microsoft login")
if not code:
logger.error("No authorization code received in callback")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"message": "No authorization code received"}
)
# Extract user and mandate info from state if available
user_id = None
mandate_id = None
if state and state != "anonymous-user":
try:
mandate_id, user_id = state.split(":")
logger.info(f"State contains mandate_id: {mandate_id}, user_id: {user_id}")
except ValueError:
logger.warning(f"Invalid state format: {state}")
# Generate a generic user ID if state is invalid
user_id = f"user_{datetime.now().strftime('%Y%m%d%H%M%S')}"
else:
# For anonymous authentication, create a generic user ID
logger.info("Anonymous authentication (no user context)")
user_id = f"user_{datetime.now().strftime('%Y%m%d%H%M%S')}"
# Create a confidential client application
msal_app = msal.ConfidentialClientApplication(
app_config["client_id"],
authority=app_config["authority"],
client_credential=app_config["client_credential"]
)
# Get tokens using the authorization code
result = msal_app.acquire_token_by_authorization_code(
code,
scopes=SCOPES,
redirect_uri=app_config["redirect_uri"]
)
if "error" in result:
logger.error(f"Error acquiring token: {result.get('error')}")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"message": f"Error acquiring token: {result.get('error_description', result.get('error'))}"}
)
# Store user information
user_info = {}
if "id_token_claims" in result:
user_info = {
"name": result["id_token_claims"].get("name", ""),
"email": result["id_token_claims"].get("preferred_username", ""),
}
# If we have user info from the token, use that for user_id
token_user_id = result["id_token_claims"].get("oid") or result["id_token_claims"].get("sub")
if token_user_id:
user_id = token_user_id
elif not user_id and user_info.get("email"):
# Fall back to email-based ID if no other ID is available
user_id = user_info.get("email", "user").replace("@", "_").replace(".", "_")
# Save tokens to file
token_data = {
"access_token": result["access_token"],
"refresh_token": result.get("refresh_token", ""),
"user_info": user_info,
"timestamp": datetime.now().isoformat()
}
# Ensure token directory exists
if not os.path.exists(TOKEN_DIR):
os.makedirs(TOKEN_DIR)
# Save token to file
token_file = os.path.join(TOKEN_DIR, f"{user_id}.json")
with open(token_file, 'w') as f:
json.dump(token_data, f)
logger.info(f"User authenticated: {user_info.get('email', 'unknown')}")
# Create a success page
html_content = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Authentication Successful</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; text-align: center; }
.success-container { max-width: 600px; margin: 0 auto; }
h1 { color: #0078d4; }
.success-icon { font-size: 72px; color: #107c10; margin: 20px 0; }
.button { display: inline-block; background-color: #0078d4; color: white;
padding: 10px 20px; text-decoration: none; border-radius: 4px;
font-weight: bold; margin-top: 20px; }
</style>
</head>
<body>
<div class="success-container">
<h1>Authentication Successful</h1>
<div class="success-icon">✓</div>
<p>You have successfully authenticated with Microsoft.</p>
<p>You can now close this tab and return to the application.</p>
<p>Your email templates will now be able to create drafts in your mailbox.</p>
<a href="javascript:window.close()" class="button">Close Window</a>
</div>
<script>
// Attempt to notify the opener window that authentication is complete
if (window.opener && !window.opener.closed) {
try {
window.opener.postMessage({ type: 'msft_auth_complete', success: true }, '*');
} catch (e) {
console.error('Error notifying opener:', e);
}
}
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
else:
logger.warning("No id_token_claims found in result")
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={"message": "Failed to retrieve user information"}
)
except Exception as e:
logger.error(f"Error in auth callback: {str(e)}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"message": f"Error in auth callback: {str(e)}"}
)
@router.get("/status")
async def auth_status(
msft_user_id: Optional[str] = Cookie(None),
currentUser: Dict[str, Any] = Depends(getCurrentActiveUser)
):
"""Check Microsoft authentication status"""
try:
# Get user ID
if not msft_user_id:
mandateId, userId = await getUserContext(currentUser)
user_id = str(userId)
else:
user_id = msft_user_id
# Check if user has a token
token_data = load_token_from_file(user_id)
if not token_data:
return JSONResponse(
content={"authenticated": False, "message": "Not authenticated with Microsoft"}
)
# Check if token is valid
if not verify_token(token_data.get("access_token", "")):
# Try to refresh token
if refresh_token(user_id):
token_data = load_token_from_file(user_id)
user_info = token_data.get("user_info", {})
return JSONResponse(
content={
"authenticated": True,
"message": "Token refreshed successfully",
"user": user_info
}
)
else:
return JSONResponse(
content={
"authenticated": False,
"message": "Token expired and couldn't be refreshed"
}
)
# Token is valid, return user info
user_info = token_data.get("user_info", {})
return JSONResponse(
content={
"authenticated": True,
"message": "Authenticated with Microsoft",
"user": user_info
}
)
except Exception as e:
logger.error(f"Error checking auth status: {str(e)}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"message": f"Error checking auth status: {str(e)}"}
)