""" Microsoft interface for handling Microsoft authentication and Graph API operations. """ import logging import json import requests import base64 import msal from typing import Dict, Any, Optional, List from datetime import datetime, timedelta import secrets import os from modules.shared.configuration import APP_CONFIG from modules.interfaces.msftModel import MsftToken, MsftUserInfo from modules.connectors.connectorDbJson import DatabaseConnector from modules.interfaces.msftAccess import MsftAccess logger = logging.getLogger(__name__) # Singleton factory for MsftInterface instances per context _msftInterfaces = {} class MsftInterface: """Interface for Microsoft authentication and Graph API operations""" def __init__(self, currentUser: Dict[str, Any]): """Initialize the Microsoft interface""" self.currentUser = currentUser self._mandateId = currentUser.get("_mandateId") self._userId = currentUser.get("id") if not self._mandateId or not self._userId: raise ValueError("Invalid user context: _mandateId and id are required") # Initialize configuration self.client_id = APP_CONFIG.get("Service_MSFT_CLIENT_ID") self.client_secret = APP_CONFIG.get("Service_MSFT_CLIENT_SECRET") self.tenant_id = APP_CONFIG.get("Service_MSFT_TENANT_ID", "common") self.redirect_uri = APP_CONFIG.get("Service_MSFT_REDIRECT_URI") self.authority = f"https://login.microsoftonline.com/{self.tenant_id}" self.scopes = ["Mail.ReadWrite", "User.Read"] # Initialize database self._initializeDatabase() # Initialize access control self.access = MsftAccess(self.currentUser, self.db) # Initialize MSAL application self.msal_app = msal.ConfidentialClientApplication( self.client_id, authority=self.authority, client_credential=self.client_secret ) def _initializeDatabase(self): """Initializes the database connection.""" try: # Get configuration values with defaults dbHost = APP_CONFIG.get("DB_MSFT_HOST", "data") dbDatabase = APP_CONFIG.get("DB_MSFT_DATABASE", "msft") dbUser = APP_CONFIG.get("DB_MSFT_USER") dbPassword = APP_CONFIG.get("DB_MSFT_PASSWORD_SECRET") # Ensure the database directory exists os.makedirs(dbHost, exist_ok=True) self.db = DatabaseConnector( dbHost=dbHost, dbDatabase=dbDatabase, dbUser=dbUser, dbPassword=dbPassword, _mandateId=self._mandateId, _userId=self._userId ) # Set context self.db.updateContext(self._mandateId, self._userId) logger.info("Database initialized successfully") except Exception as e: logger.error(f"Failed to initialize database: {str(e)}") raise def _uam(self, table: str, recordset: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Unified user access management function that filters data based on user privileges and adds access control attributes. Args: table: Name of the table recordset: Recordset to filter based on access rules Returns: Filtered recordset with access control attributes """ return self.access.uam(table, recordset) def _canModify(self, table: str, recordId: Optional[str] = None) -> bool: """ Checks if the current user can modify (create/update/delete) records in a table. Args: table: Name of the table recordId: Optional record ID for specific record check Returns: Boolean indicating permission """ return self.access.canModify(table, recordId) def getMsftToken(self) -> Optional[Dict[str, Any]]: """Get Microsoft token for current user""" try: tokens = self.db.getRecordset("msftTokens", recordFilter={ "_mandateId": self._mandateId, "_userId": self._userId }) if not tokens: return None # Apply access control filtered_tokens = self._uam("msftTokens", tokens) if not filtered_tokens: return None return filtered_tokens[0] except Exception as e: logger.error(f"Error getting Microsoft token: {str(e)}") return None def saveMsftToken(self, token_data: Dict[str, Any]) -> bool: """Save Microsoft token data""" try: # Check if user can modify tokens if not self._canModify("msftTokens"): raise PermissionError("No permission to save Microsoft token") # Add user and mandate IDs to token data token_data["_mandateId"] = self._mandateId token_data["_userId"] = self._userId # Check if token already exists existing_token = self.getMsftToken() if existing_token: # Update existing token return self.db.recordUpdate("msftTokens", existing_token["id"], token_data) else: # Create new token record return self.db.recordCreate("msftTokens", token_data) except Exception as e: logger.error(f"Error saving Microsoft token: {str(e)}") return False def deleteMsftToken(self) -> bool: """Delete Microsoft token for current user""" try: if not self._canModify("msftTokens"): raise PermissionError("No permission to delete Microsoft token") existing_token = self.getMsftToken() if existing_token: return self.db.recordDelete("msftTokens", existing_token["id"]) return True except Exception as e: logger.error(f"Error deleting Microsoft token: {str(e)}") return False def getCurrentUserToken(self) -> tuple: """Get current user's Microsoft token and info""" try: token_data = self.getMsftToken() if not token_data: return None, None # Verify token is still valid if not self.verifyToken(token_data.get("access_token")): if not self.refreshToken(token_data): return None, None token_data = self.getMsftToken() user_info = token_data.get("user_info") if not user_info: user_info = self.getUserInfoFromToken(token_data.get("access_token")) if user_info: token_data["user_info"] = user_info self.saveMsftToken(token_data) return user_info, token_data.get("access_token") except Exception as e: logger.error(f"Error getting current user token: {str(e)}") return None, None def verifyToken(self, token: str) -> bool: """Verify the access token is valid""" try: headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers) return response.status_code == 200 except Exception as e: logger.error(f"Error verifying token: {str(e)}") return False def refreshToken(self, token_data: Dict[str, Any]) -> bool: """Refresh the access token using the stored refresh token""" try: if not token_data or not token_data.get("refresh_token"): return False result = self.msal_app.acquire_token_by_refresh_token( token_data["refresh_token"], scopes=self.scopes ) if "error" in result: logger.error(f"Error refreshing token: {result.get('error')}") return False token_data["access_token"] = result["access_token"] if "refresh_token" in result: token_data["refresh_token"] = result["refresh_token"] return self.saveMsftToken(token_data) except Exception as e: logger.error(f"Error refreshing token: {str(e)}") return False def getUserInfoFromToken(self, access_token: str) -> Optional[Dict[str, Any]]: """Get user information using the access token""" try: headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } response = requests.get('https://graph.microsoft.com/v1.0/me', headers=headers) if response.status_code == 200: user_data = response.json() return { "name": user_data.get("displayName", ""), "email": user_data.get("userPrincipalName", ""), "id": user_data.get("id", "") } return None except Exception as e: logger.error(f"Error getting user info: {str(e)}") return None def createDraftEmail(self, recipient: str, subject: str, body: str, attachments: List[Dict[str, Any]] = None) -> bool: """Create a draft email using Microsoft Graph API""" try: user_info, access_token = self.getCurrentUserToken() if not user_info or not access_token: return False headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json' } email_data = { 'subject': subject, 'body': { 'contentType': 'HTML', 'content': body }, 'toRecipients': [ { 'emailAddress': { 'address': recipient } } ] } if attachments: email_data['attachments'] = [] for attachment in attachments: doc = attachment.get('document', {}) file_name = attachment.get('name', 'attachment.file') file_content = doc.get('data') if not file_content: continue mime_type = doc.get('mimeType', 'application/octet-stream') is_base64 = doc.get('base64Encoded', False) try: if is_base64: content_bytes = file_content else: if isinstance(file_content, str): content_bytes = base64.b64encode(file_content.encode('utf-8')).decode('utf-8') elif isinstance(file_content, bytes): content_bytes = base64.b64encode(file_content).decode('utf-8') else: continue decoded_size = len(base64.b64decode(content_bytes)) attachment_data = { '@odata.type': '#microsoft.graph.fileAttachment', 'name': file_name, 'contentType': mime_type, 'contentBytes': content_bytes, 'isInline': False, 'size': decoded_size } email_data['attachments'].append(attachment_data) except Exception as e: logger.error(f"Error processing attachment {file_name}: {str(e)}") continue response = requests.post( 'https://graph.microsoft.com/v1.0/me/messages', headers=headers, json=email_data ) return response.status_code >= 200 and response.status_code < 300 except Exception as e: logger.error(f"Error creating draft email: {str(e)}") return False def initiateLogin(self) -> str: """Initiate Microsoft login flow""" try: state = secrets.token_urlsafe(32) auth_url = self.msal_app.get_authorization_request_url( self.scopes, state=state, redirect_uri=self.redirect_uri ) return auth_url except Exception as e: logger.error(f"Error initiating login: {str(e)}") return None def handleAuthCallback(self, code: str) -> Optional[Dict[str, Any]]: """Handle Microsoft OAuth callback""" try: token_response = self.msal_app.acquire_token_by_authorization_code( code, self.scopes, redirect_uri=self.redirect_uri ) if "error" in token_response: logger.error(f"Token acquisition failed: {token_response['error']}") return None user_info = self.getUserInfoFromToken(token_response["access_token"]) if not user_info: return None token_response["user_info"] = user_info return token_response except Exception as e: logger.error(f"Error handling auth callback: {str(e)}") return None def getInterface(currentUser: Dict[str, Any]) -> MsftInterface: """ Returns a MsftInterface instance for the current user. Handles initialization of database and records. """ mandateId = currentUser.get("_mandateId") userId = currentUser.get("id") if not mandateId or not userId: raise ValueError("Invalid user context: _mandateId and id are required") # Create context key contextKey = f"{mandateId}_{userId}" # Create new instance if not exists if contextKey not in _msftInterfaces: _msftInterfaces[contextKey] = MsftInterface(currentUser) return _msftInterfaces[contextKey]