gateway/modules/security/tokenManager.py

145 lines
No EOL
6.5 KiB
Python

"""
Token Manager Service
Handles all token operations including automatic refresh for backend services.
"""
import logging
import httpx
from datetime import datetime
from typing import Optional, Dict, Any
from modules.interfaces.interfaceAppModel import Token, AuthAuthority
from modules.shared.configuration import APP_CONFIG
from modules.shared.timezoneUtils import get_utc_timestamp, create_expiration_timestamp
logger = logging.getLogger(__name__)
class TokenManager:
"""Centralized token management service"""
def __init__(self):
# Microsoft OAuth configuration
self.msft_client_id = APP_CONFIG.get("Service_MSFT_CLIENT_ID")
self.msft_client_secret = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET")
self.msft_tenant_id = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common")
# Google OAuth configuration
self.google_client_id = APP_CONFIG.get("Service_GOOGLE_CLIENT_ID")
self.google_client_secret = APP_CONFIG.get("Service_GOOGLE_CLIENT_SECRET")
def refresh_microsoft_token(self, refresh_token: str, user_id: str, old_token: Token) -> Optional[Token]:
"""Refresh Microsoft OAuth token using refresh token"""
try:
if not self.msft_client_id or not self.msft_client_secret:
logger.error("Microsoft OAuth configuration not found")
return None
# Microsoft token refresh endpoint
token_url = f"https://login.microsoftonline.com/{self.msft_tenant_id}/oauth2/v2.0/token"
# Prepare refresh request
data = {
"client_id": self.msft_client_id,
"client_secret": self.msft_client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "Mail.ReadWrite Mail.Send Mail.ReadWrite.Shared User.Read"
}
# Make refresh request
with httpx.Client(timeout=30.0) as client:
response = client.post(token_url, data=data)
if response.status_code == 200:
token_data = response.json()
# Create new token
new_token = Token(
userId=user_id,
authority=AuthAuthority.MSFT,
connectionId=old_token.connectionId, # Preserve connection ID
tokenAccess=token_data["access_token"],
tokenRefresh=token_data.get("refresh_token", refresh_token), # Keep old refresh token if new one not provided
tokenType=token_data.get("token_type", "bearer"),
expiresAt=create_expiration_timestamp(token_data.get("expires_in", 3600)),
createdAt=get_utc_timestamp()
)
return new_token
else:
logger.error(f"Failed to refresh Microsoft token: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error refreshing Microsoft token: {str(e)}")
return None
def refresh_google_token(self, refresh_token: str, user_id: str, old_token: Token) -> Optional[Token]:
"""Refresh Google OAuth token using refresh token"""
try:
if not self.google_client_id or not self.google_client_secret:
logger.error("Google OAuth configuration not found")
return None
# Google token refresh endpoint
token_url = "https://oauth2.googleapis.com/token"
# Prepare refresh request
data = {
"client_id": self.google_client_id,
"client_secret": self.google_client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"scope": "https://www.googleapis.com/auth/gmail.readonly https://www.googleapis.com/auth/userinfo.profile https://www.googleapis.com/auth/userinfo.email openid"
}
# Make refresh request
with httpx.Client(timeout=30.0) as client:
response = client.post(token_url, data=data)
if response.status_code == 200:
token_data = response.json()
# Create new token
new_token = Token(
userId=user_id,
authority=AuthAuthority.GOOGLE,
connectionId=old_token.connectionId, # Preserve connection ID
tokenAccess=token_data["access_token"],
tokenRefresh=refresh_token, # Google doesn't always provide new refresh token
tokenType=token_data.get("token_type", "bearer"),
expiresAt=create_expiration_timestamp(token_data.get("expires_in", 3600)),
createdAt=get_utc_timestamp()
)
return new_token
else:
logger.error(f"Failed to refresh Google token: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error refreshing Google token: {str(e)}")
return None
def refresh_token(self, old_token: Token) -> Optional[Token]:
"""Refresh an expired token using the appropriate OAuth service"""
try:
if not old_token.tokenRefresh:
logger.warning(f"No refresh token available for {old_token.authority}")
return None
# Route to appropriate refresh method
if old_token.authority == AuthAuthority.MSFT:
return self.refresh_microsoft_token(old_token.tokenRefresh, old_token.userId, old_token)
elif old_token.authority == AuthAuthority.GOOGLE:
return self.refresh_google_token(old_token.tokenRefresh, old_token.userId, old_token)
else:
logger.warning(f"Unknown authority for token refresh: {old_token.authority}")
return None
except Exception as e:
logger.error(f"Error refreshing token: {str(e)}")
return None