""" Google interface for handling Google authentication and API operations. """ import logging import requests from typing import Dict, Any, Optional, Tuple from datetime import datetime import secrets from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import Flow from google.auth.transport.requests import Request import os from modules.shared.configuration import APP_CONFIG from modules.interfaces.googleModel import GoogleToken, GoogleUserInfo, GoogleConfig from modules.connectors.connectorDbJson import DatabaseConnector from modules.interfaces.googleAccess import GoogleAccess from modules.interfaces.gatewayInterface import getRootUser logger = logging.getLogger(__name__) # Singleton factory for GoogleInterface instances per context _googleInterfaces = {} # Root interface instance _rootGoogleInterface = None class GoogleInterface: """Interface for Google authentication and API operations""" def __init__(self, currentUser: Dict[str, Any] = None): """Initialize the Google interface""" # Initialize variables self.currentUser = currentUser self.mandateId = currentUser.get("mandateId") if currentUser else None self.userId = currentUser.get("id") if currentUser else None self.access = None # Will be set when user context is provided # Initialize configuration self.clientId = APP_CONFIG.get("Service_GOOGLE_CLIENT_ID") self.clientSecret = APP_CONFIG.get("Service_GOOGLE_CLIENT_SECRET") self.redirectUri = APP_CONFIG.get("Service_GOOGLE_REDIRECT_URI") self.authorityUrl = "https://accounts.google.com" self.tokenUrl = "https://oauth2.googleapis.com/token" self.userInfoUrl = "https://www.googleapis.com/oauth2/v3/userinfo" self.scopes = ["openid", "profile", "email"] # Initialize database self._initializeDatabase() # Initialize OAuth2 flow self.flow = Flow.from_client_config( { "web": { "client_id": self.clientId, "client_secret": self.clientSecret, "auth_uri": f"{self.authorityUrl}/o/oauth2/auth", "token_uri": self.tokenUrl, "redirect_uris": [self.redirectUri] } }, scopes=self.scopes ) # Set user context if provided if currentUser: self.setUserContext(currentUser) def _initializeDatabase(self): """Initializes the database connection.""" try: # Get configuration values with defaults dbHost = APP_CONFIG.get("DB_GOOGLE_HOST", "data") dbDatabase = APP_CONFIG.get("DB_GOOGLE_DATABASE", "google") dbUser = APP_CONFIG.get("DB_GOOGLE_USER") dbPassword = APP_CONFIG.get("DB_GOOGLE_PASSWORD_SECRET") # Ensure the database directory exists os.makedirs(dbHost, exist_ok=True) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, mandateId=self.mandateId, userId=self.userId ) # Set context self.db.updateContext(self.mandateId, self.userId) logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {str(e)}") raise def initiateLogin(self) -> str: """Initiate Google login flow""" try: # Generate auth URL auth_url, _ = self.flow.authorization_url( access_type="offline", include_granted_scopes="true", state=self._generateState() ) return auth_url except Exception as e: logger.error(f"Error initiating Google login: {str(e)}") return None def handleAuthCallback(self, code: str) -> Optional[GoogleToken]: """Handle Google OAuth callback""" try: # Exchange code for token self.flow.fetch_token(code=code) credentials = self.flow.credentials # Get user info user_info = self.getUserInfoFromToken(credentials.token) if not user_info: return None # Create token model token = GoogleToken( access_token=credentials.token, refresh_token=credentials.refresh_token, expires_in=credentials.expiry.timestamp() - datetime.now().timestamp(), token_type=credentials.token_type, expires_at=credentials.expiry.timestamp(), user_info=user_info.model_dump(), mandateId=self.mandateId, userId=self.userId ) return token except Exception as e: logger.error(f"Error handling auth callback: {str(e)}") return None def verifyToken(self, token: str) -> bool: """Verify Google token""" try: # Get user info from token user_info = self.getUserInfoFromToken(token) if not user_info: return False # Get current user's Google connection user = self.db.getRecordset("users", recordFilter={"id": self.userId})[0] google_connection = next((conn for conn in user.get("connections", []) if conn.get("authority") == "google"), None) if not google_connection: return False # Verify the token belongs to this user return user_info.id == google_connection.get("externalId") except Exception as e: logger.error(f"Error verifying Google token: {str(e)}") return False def getUserInfoFromToken(self, token: str) -> Optional[GoogleUserInfo]: """Get user info from Google API""" try: # Call Google API response = requests.get( self.userInfoUrl, headers={"Authorization": f"Bearer {token}"} ) if response.status_code != 200: logger.error(f"Failed to get user info: {response.text}") return None data = response.json() # Create user info model return GoogleUserInfo( id=data["sub"], # Google uses 'sub' as the unique identifier email=data["email"], name=data.get("name", ""), picture=data.get("picture") # Google provides profile picture URL ) except Exception as e: logger.error(f"Error getting user info: {str(e)}") return None def refreshToken(self, refresh_token: str) -> Optional[GoogleToken]: """Refresh Google token""" try: # Create credentials object credentials = Credentials( None, # No access token refresh_token=refresh_token, token_uri=self.tokenUrl, client_id=self.clientId, client_secret=self.clientSecret ) # Refresh token credentials.refresh(Request()) # Get user info user_info = self.getUserInfoFromToken(credentials.token) if not user_info: return None # Create token model token = GoogleToken( access_token=credentials.token, refresh_token=credentials.refresh_token or refresh_token, expires_in=credentials.expiry.timestamp() - datetime.now().timestamp(), token_type=credentials.token_type, expires_at=credentials.expiry.timestamp(), user_info=user_info.model_dump(), mandateId=self.mandateId, userId=self.userId ) return token except Exception as e: logger.error(f"Error refreshing token: {str(e)}") return None def _generateState(self) -> str: """Generate secure state token""" return secrets.token_urlsafe(32) def setUserContext(self, currentUser: Dict[str, Any]): """Set user context for the interface""" if not currentUser: logger.info("Initializing interface without user context") return self.currentUser = currentUser self.mandateId = currentUser.get("mandateId") self.userId = currentUser.get("id") if not self.mandateId or not self.userId: raise ValueError("Invalid user context: mandateId and id are required") # Initialize access control with user context self.access = GoogleAccess(self.currentUser, self.db) # Update database context self.db.updateContext(self.mandateId, self.userId) logger.debug(f"User context set: userId={self.userId}") def getRootInterface() -> GoogleInterface: """ Returns a GoogleInterface instance with root privileges. This is used for initial setup and user creation. """ global _rootGoogleInterface if _rootGoogleInterface is None: # Get root user from gateway rootUser = getRootUser() _rootGoogleInterface = GoogleInterface(rootUser) return _rootGoogleInterface def getInterface(currentUser: Dict[str, Any] = None) -> GoogleInterface: """ Returns a GoogleInterface instance. If currentUser is provided, initializes with user context. Otherwise, returns an instance with only database access. """ # Create new instance if not exists if "default" not in _googleInterfaces: _googleInterfaces["default"] = GoogleInterface(currentUser or {}) interface = _googleInterfaces["default"] if currentUser: interface.setUserContext(currentUser) else: logger.info("Returning interface without user context") return interface