""" Token Manager Service Handles all token operations including automatic refresh for backend services. """ import logging import httpx from datetime import datetime from typing import Optional, Dict, Any from modules.interfaces.interfaceAppModel import Token, AuthAuthority from modules.shared.configuration import APP_CONFIG from modules.shared.timezoneUtils import get_utc_timestamp, create_expiration_timestamp, is_expired_utc, get_expires_in_seconds logger = logging.getLogger(__name__) class TokenManager: """Centralized token management service""" def __init__(self): # Microsoft OAuth configuration self.msft_client_id = APP_CONFIG.get("Service_MSFT_CLIENT_ID") self.msft_client_secret = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET") self.msft_tenant_id = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common") # Google OAuth configuration self.google_client_id = APP_CONFIG.get("Service_GOOGLE_CLIENT_ID") self.google_client_secret = APP_CONFIG.get("Service_GOOGLE_CLIENT_SECRET") def refresh_microsoft_token(self, refresh_token: str, user_id: str, old_token: Token) -> Optional[Token]: """Refresh Microsoft OAuth token using refresh token""" try: if not self.msft_client_id or not self.msft_client_secret: logger.error("Microsoft OAuth configuration not found") return None # Microsoft token refresh endpoint token_url = f"https://login.microsoftonline.com/{self.msft_tenant_id}/oauth2/v2.0/token" # Prepare refresh request data = { "client_id": self.msft_client_id, "client_secret": self.msft_client_secret, "grant_type": "refresh_token", "refresh_token": refresh_token, "scope": "Mail.ReadWrite Mail.Send Mail.ReadWrite.Shared User.Read" } # Make refresh request with httpx.Client(timeout=30.0) as client: response = client.post(token_url, data=data) if response.status_code == 200: token_data = response.json() # Create new token new_token = Token( userId=user_id, authority=AuthAuthority.MSFT, connectionId=old_token.connectionId, # Preserve connection ID tokenAccess=token_data["access_token"], tokenRefresh=token_data.get("refresh_token", refresh_token), # Keep old refresh token if new one not provided tokenType=token_data.get("token_type", "bearer"), expiresAt=create_expiration_timestamp(token_data.get("expires_in", 3600)), createdAt=get_utc_timestamp() ) return new_token else: logger.error(f"Failed to refresh Microsoft token: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Error refreshing Microsoft token: {str(e)}") return None def refresh_google_token(self, refresh_token: str, user_id: str, old_token: Token) -> Optional[Token]: """Refresh Google OAuth token using refresh token""" try: if not self.google_client_id or not self.google_client_secret: logger.error("Google OAuth configuration not found") return None # Google token refresh endpoint token_url = "https://oauth2.googleapis.com/token" # Prepare refresh request data = { "client_id": self.google_client_id, "client_secret": self.google_client_secret, "grant_type": "refresh_token", "refresh_token": refresh_token, "scope": "https://www.googleapis.com/auth/gmail.readonly https://www.googleapis.com/auth/userinfo.profile https://www.googleapis.com/auth/userinfo.email openid" } # Make refresh request with httpx.Client(timeout=30.0) as client: response = client.post(token_url, data=data) if response.status_code == 200: token_data = response.json() # Create new token new_token = Token( userId=user_id, authority=AuthAuthority.GOOGLE, connectionId=old_token.connectionId, # Preserve connection ID tokenAccess=token_data["access_token"], tokenRefresh=refresh_token, # Google doesn't always provide new refresh token tokenType=token_data.get("token_type", "bearer"), expiresAt=create_expiration_timestamp(token_data.get("expires_in", 3600)), createdAt=get_utc_timestamp() ) return new_token else: logger.error(f"Failed to refresh Google token: {response.status_code} - {response.text}") return None except Exception as e: logger.error(f"Error refreshing Google token: {str(e)}") return None def refresh_token(self, old_token: Token) -> Optional[Token]: """Refresh an expired token using the appropriate OAuth service""" try: if not old_token.tokenRefresh: logger.warning(f"No refresh token available for {old_token.authority}") return None # Route to appropriate refresh method if old_token.authority == AuthAuthority.MSFT: return self.refresh_microsoft_token(old_token.tokenRefresh, old_token.userId, old_token) elif old_token.authority == AuthAuthority.GOOGLE: return self.refresh_google_token(old_token.tokenRefresh, old_token.userId, old_token) else: logger.warning(f"Unknown authority for token refresh: {old_token.authority}") return None except Exception as e: logger.error(f"Error refreshing token: {str(e)}") return None def is_token_expired(self, token: Token) -> bool: """Check if a token is expired""" if not token.expiresAt: return False return is_expired_utc(token.expiresAt) def get_token_status(self, token: Token) -> Dict[str, Any]: """Get comprehensive token status information""" if not token.expiresAt: return { "status": "valid", "expires_at": None, "expires_in_seconds": None, "expires_soon": False } expires_in = get_expires_in_seconds(token.expiresAt) return { "status": "expired" if expires_in and expires_in <= 0 else "valid", "expires_at": token.expiresAt, "expires_in_seconds": expires_in, "expires_soon": expires_in and expires_in <= 3600 # 1 hour }